1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.grpclb; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.truth.Truth.assertThat; 21 import static io.grpc.ConnectivityState.CONNECTING; 22 import static io.grpc.ConnectivityState.IDLE; 23 import static io.grpc.ConnectivityState.READY; 24 import static io.grpc.ConnectivityState.SHUTDOWN; 25 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 26 import static io.grpc.grpclb.GrpclbState.BUFFER_ENTRY; 27 import static io.grpc.grpclb.GrpclbState.DROP_PICK_RESULT; 28 import static io.grpc.grpclb.GrpclbState.NO_USE_AUTHORITY_SUFFIX; 29 import static org.junit.Assert.assertEquals; 30 import static org.junit.Assert.assertFalse; 31 import static org.junit.Assert.assertNull; 32 import static org.junit.Assert.assertSame; 33 import static org.junit.Assert.assertTrue; 34 import static org.mockito.AdditionalAnswers.delegatesTo; 35 import static org.mockito.ArgumentMatchers.any; 36 import static org.mockito.ArgumentMatchers.anyString; 37 import static org.mockito.ArgumentMatchers.eq; 38 import static org.mockito.ArgumentMatchers.same; 39 import static org.mockito.Mockito.atLeast; 40 import static org.mockito.Mockito.doAnswer; 41 import static org.mockito.Mockito.inOrder; 42 import static org.mockito.Mockito.mock; 43 import static org.mockito.Mockito.never; 44 import static org.mockito.Mockito.times; 45 import static org.mockito.Mockito.verify; 46 import static org.mockito.Mockito.verifyNoMoreInteractions; 47 import static org.mockito.Mockito.when; 48 49 import com.google.common.collect.Iterables; 50 import com.google.protobuf.ByteString; 51 import com.google.protobuf.util.Durations; 52 import com.google.protobuf.util.Timestamps; 53 import io.grpc.Attributes; 54 import io.grpc.ChannelLogger; 55 import io.grpc.ClientStreamTracer; 56 import io.grpc.ConnectivityState; 57 import io.grpc.ConnectivityStateInfo; 58 import io.grpc.Context; 59 import io.grpc.Context.CancellableContext; 60 import io.grpc.EquivalentAddressGroup; 61 import io.grpc.LoadBalancer.CreateSubchannelArgs; 62 import io.grpc.LoadBalancer.Helper; 63 import io.grpc.LoadBalancer.PickResult; 64 import io.grpc.LoadBalancer.PickSubchannelArgs; 65 import io.grpc.LoadBalancer.ResolvedAddresses; 66 import io.grpc.LoadBalancer.Subchannel; 67 import io.grpc.LoadBalancer.SubchannelPicker; 68 import io.grpc.LoadBalancer.SubchannelStateListener; 69 import io.grpc.ManagedChannel; 70 import io.grpc.Metadata; 71 import io.grpc.Status; 72 import io.grpc.Status.Code; 73 import io.grpc.SynchronizationContext; 74 import io.grpc.grpclb.GrpclbState.BackendEntry; 75 import io.grpc.grpclb.GrpclbState.DropEntry; 76 import io.grpc.grpclb.GrpclbState.ErrorEntry; 77 import io.grpc.grpclb.GrpclbState.IdleSubchannelEntry; 78 import io.grpc.grpclb.GrpclbState.Mode; 79 import io.grpc.grpclb.GrpclbState.RoundRobinPicker; 80 import io.grpc.inprocess.InProcessChannelBuilder; 81 import io.grpc.inprocess.InProcessServerBuilder; 82 import io.grpc.internal.BackoffPolicy; 83 import io.grpc.internal.FakeClock; 84 import io.grpc.lb.v1.ClientStats; 85 import io.grpc.lb.v1.ClientStatsPerToken; 86 import io.grpc.lb.v1.FallbackResponse; 87 import io.grpc.lb.v1.InitialLoadBalanceRequest; 88 import io.grpc.lb.v1.InitialLoadBalanceResponse; 89 import io.grpc.lb.v1.LoadBalanceRequest; 90 import io.grpc.lb.v1.LoadBalanceResponse; 91 import io.grpc.lb.v1.LoadBalancerGrpc; 92 import io.grpc.lb.v1.Server; 93 import io.grpc.lb.v1.ServerList; 94 import io.grpc.stub.StreamObserver; 95 import java.net.InetSocketAddress; 96 import java.net.SocketAddress; 97 import java.text.MessageFormat; 98 import java.util.ArrayDeque; 99 import java.util.ArrayList; 100 import java.util.Arrays; 101 import java.util.Collections; 102 import java.util.List; 103 import java.util.concurrent.ScheduledExecutorService; 104 import java.util.concurrent.TimeUnit; 105 import javax.annotation.Nonnull; 106 import javax.annotation.Nullable; 107 import org.junit.After; 108 import org.junit.Before; 109 import org.junit.Rule; 110 import org.junit.Test; 111 import org.junit.runner.RunWith; 112 import org.junit.runners.JUnit4; 113 import org.mockito.AdditionalAnswers; 114 import org.mockito.ArgumentCaptor; 115 import org.mockito.ArgumentMatchers; 116 import org.mockito.Captor; 117 import org.mockito.InOrder; 118 import org.mockito.Mock; 119 import org.mockito.invocation.InvocationOnMock; 120 import org.mockito.junit.MockitoJUnit; 121 import org.mockito.junit.MockitoRule; 122 import org.mockito.stubbing.Answer; 123 124 /** Unit tests for {@link GrpclbLoadBalancer}. */ 125 @RunWith(JUnit4.class) 126 public class GrpclbLoadBalancerTest { 127 @Rule public final MockitoRule mocks = MockitoJUnit.rule(); 128 129 private static final String SERVICE_AUTHORITY = "api.google.com"; 130 131 // The tasks are wrapped by SynchronizationContext, so we can't compare the types 132 // directly. 133 private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER = 134 new FakeClock.TaskFilter() { 135 @Override 136 public boolean shouldAccept(Runnable command) { 137 return command.toString().contains(GrpclbState.LoadReportingTask.class.getSimpleName()); 138 } 139 }; 140 private static final FakeClock.TaskFilter FALLBACK_MODE_TASK_FILTER = 141 new FakeClock.TaskFilter() { 142 @Override 143 public boolean shouldAccept(Runnable command) { 144 return command.toString().contains(GrpclbState.FallbackModeTask.class.getSimpleName()); 145 } 146 }; 147 private static final FakeClock.TaskFilter LB_RPC_RETRY_TASK_FILTER = 148 new FakeClock.TaskFilter() { 149 @Override 150 public boolean shouldAccept(Runnable command) { 151 return command.toString().contains(GrpclbState.LbRpcRetryTask.class.getSimpleName()); 152 } 153 }; 154 private static final Attributes LB_BACKEND_ATTRS = 155 Attributes.newBuilder().set(GrpclbConstants.ATTR_LB_PROVIDED_BACKEND, true).build(); 156 157 private final Helper helper = mock(Helper.class, delegatesTo(new FakeHelper())); 158 private final SubchannelPool subchannelPool = 159 mock( 160 SubchannelPool.class, 161 delegatesTo(new CachedSubchannelPool(helper))); 162 private final ArrayList<String> logs = new ArrayList<>(); 163 private final ChannelLogger channelLogger = new ChannelLogger() { 164 @Override 165 public void log(ChannelLogLevel level, String msg) { 166 logs.add(level + ": " + msg); 167 } 168 169 @Override 170 public void log(ChannelLogLevel level, String template, Object... args) { 171 log(level, MessageFormat.format(template, args)); 172 } 173 }; 174 private SubchannelPicker currentPicker; 175 private LoadBalancerGrpc.LoadBalancerImplBase mockLbService; 176 @Captor 177 private ArgumentCaptor<StreamObserver<LoadBalanceResponse>> lbResponseObserverCaptor; 178 private final FakeClock fakeClock = new FakeClock(); 179 private final ArrayDeque<StreamObserver<LoadBalanceRequest>> lbRequestObservers = 180 new ArrayDeque<>(); 181 private final ArrayDeque<Subchannel> mockSubchannels = new ArrayDeque<>(); 182 private final ArrayDeque<ManagedChannel> fakeOobChannels = new ArrayDeque<>(); 183 private final ArrayList<Subchannel> unpooledSubchannelTracker = new ArrayList<>(); 184 private final ArrayList<ManagedChannel> oobChannelTracker = new ArrayList<>(); 185 private final SynchronizationContext syncContext = new SynchronizationContext( 186 new Thread.UncaughtExceptionHandler() { 187 @Override 188 public void uncaughtException(Thread t, Throwable e) { 189 throw new AssertionError(e); 190 } 191 }); 192 private static final ClientStreamTracer.StreamInfo STREAM_INFO = 193 ClientStreamTracer.StreamInfo.newBuilder().build(); 194 195 private io.grpc.Server fakeLbServer; 196 @Captor 197 private ArgumentCaptor<SubchannelPicker> pickerCaptor; 198 @Mock 199 private BackoffPolicy.Provider backoffPolicyProvider; 200 @Mock 201 private BackoffPolicy backoffPolicy1; 202 @Mock 203 private BackoffPolicy backoffPolicy2; 204 private GrpclbLoadBalancer balancer; 205 private final ArgumentCaptor<CreateSubchannelArgs> createSubchannelArgsCaptor = 206 ArgumentCaptor.forClass(CreateSubchannelArgs.class); 207 208 @Before setUp()209 public void setUp() throws Exception { 210 mockLbService = mock(LoadBalancerGrpc.LoadBalancerImplBase.class, delegatesTo( 211 new LoadBalancerGrpc.LoadBalancerImplBase() { 212 @Override 213 @SuppressWarnings("unchecked") 214 public StreamObserver<LoadBalanceRequest> balanceLoad( 215 final StreamObserver<LoadBalanceResponse> responseObserver) { 216 StreamObserver<LoadBalanceRequest> requestObserver = 217 mock(StreamObserver.class); 218 Answer<Void> closeRpc = new Answer<Void>() { 219 @Override 220 public Void answer(InvocationOnMock invocation) { 221 responseObserver.onCompleted(); 222 return null; 223 } 224 }; 225 doAnswer(closeRpc).when(requestObserver).onCompleted(); 226 lbRequestObservers.add(requestObserver); 227 return requestObserver; 228 } 229 })); 230 fakeLbServer = InProcessServerBuilder.forName("fakeLb") 231 .directExecutor().addService(mockLbService).build().start(); 232 when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); 233 when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L); 234 when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); 235 balancer = new GrpclbLoadBalancer( 236 helper, 237 Context.ROOT, 238 subchannelPool, 239 fakeClock.getTimeProvider(), 240 fakeClock.getStopwatchSupplier().get(), 241 backoffPolicyProvider); 242 } 243 244 @After tearDown()245 public void tearDown() { 246 try { 247 if (balancer != null) { 248 syncContext.execute(new Runnable() { 249 @Override 250 public void run() { 251 balancer.shutdown(); 252 } 253 }); 254 } 255 for (ManagedChannel channel : oobChannelTracker) { 256 assertTrue(channel + " is shutdown", channel.isShutdown()); 257 // balancer should have closed the LB stream, terminating the OOB channel. 258 assertTrue(channel + " is terminated", channel.isTerminated()); 259 } 260 for (Subchannel subchannel : unpooledSubchannelTracker) { 261 verify(subchannel).shutdown(); 262 } 263 // No timer should linger after shutdown 264 assertThat(fakeClock.getPendingTasks()).isEmpty(); 265 } finally { 266 if (fakeLbServer != null) { 267 fakeLbServer.shutdownNow(); 268 } 269 } 270 } 271 272 @Test roundRobinPickerNoDrop()273 public void roundRobinPickerNoDrop() { 274 GrpclbClientLoadRecorder loadRecorder = 275 new GrpclbClientLoadRecorder(fakeClock.getTimeProvider()); 276 Subchannel subchannel = mock(Subchannel.class); 277 BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001"); 278 BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002"); 279 280 List<BackendEntry> pickList = Arrays.asList(b1, b2); 281 RoundRobinPicker picker = new RoundRobinPicker(Collections.<DropEntry>emptyList(), pickList); 282 283 PickSubchannelArgs args1 = mock(PickSubchannelArgs.class); 284 Metadata headers1 = new Metadata(); 285 // The existing token on the headers will be replaced 286 headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD"); 287 when(args1.getHeaders()).thenReturn(headers1); 288 assertSame(b1.result, picker.pickSubchannel(args1)); 289 verify(args1).getHeaders(); 290 assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); 291 292 PickSubchannelArgs args2 = mock(PickSubchannelArgs.class); 293 Metadata headers2 = new Metadata(); 294 when(args2.getHeaders()).thenReturn(headers2); 295 assertSame(b2.result, picker.pickSubchannel(args2)); 296 verify(args2).getHeaders(); 297 assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002"); 298 299 PickSubchannelArgs args3 = mock(PickSubchannelArgs.class); 300 Metadata headers3 = new Metadata(); 301 when(args3.getHeaders()).thenReturn(headers3); 302 assertSame(b1.result, picker.pickSubchannel(args3)); 303 verify(args3).getHeaders(); 304 assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); 305 306 verify(subchannel, never()).getAttributes(); 307 } 308 309 @Test roundRobinPickerWithDrop()310 public void roundRobinPickerWithDrop() { 311 assertTrue(DROP_PICK_RESULT.isDrop()); 312 GrpclbClientLoadRecorder loadRecorder = 313 new GrpclbClientLoadRecorder(fakeClock.getTimeProvider()); 314 Subchannel subchannel = mock(Subchannel.class); 315 // 1 out of 2 requests are to be dropped 316 DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003"); 317 List<DropEntry> dropList = Arrays.asList(null, d); 318 319 BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001"); 320 BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002"); 321 List<BackendEntry> pickList = Arrays.asList(b1, b2); 322 RoundRobinPicker picker = new RoundRobinPicker(dropList, pickList); 323 324 // dropList[0], pickList[0] 325 PickSubchannelArgs args1 = mock(PickSubchannelArgs.class); 326 Metadata headers1 = new Metadata(); 327 headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD"); 328 when(args1.getHeaders()).thenReturn(headers1); 329 assertSame(b1.result, picker.pickSubchannel(args1)); 330 verify(args1).getHeaders(); 331 assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); 332 333 // dropList[1]: drop 334 PickSubchannelArgs args2 = mock(PickSubchannelArgs.class); 335 Metadata headers2 = new Metadata(); 336 when(args2.getHeaders()).thenReturn(headers2); 337 assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args2)); 338 verify(args2, never()).getHeaders(); 339 340 // dropList[0], pickList[1] 341 PickSubchannelArgs args3 = mock(PickSubchannelArgs.class); 342 Metadata headers3 = new Metadata(); 343 when(args3.getHeaders()).thenReturn(headers3); 344 assertSame(b2.result, picker.pickSubchannel(args3)); 345 verify(args3).getHeaders(); 346 assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002"); 347 348 // dropList[1]: drop 349 PickSubchannelArgs args4 = mock(PickSubchannelArgs.class); 350 Metadata headers4 = new Metadata(); 351 when(args4.getHeaders()).thenReturn(headers4); 352 assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args4)); 353 verify(args4, never()).getHeaders(); 354 355 // dropList[0], pickList[0] 356 PickSubchannelArgs args5 = mock(PickSubchannelArgs.class); 357 Metadata headers5 = new Metadata(); 358 when(args5.getHeaders()).thenReturn(headers5); 359 assertSame(b1.result, picker.pickSubchannel(args5)); 360 verify(args5).getHeaders(); 361 assertThat(headers5.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); 362 363 verify(subchannel, never()).getAttributes(); 364 } 365 366 @Test roundRobinPickerWithIdleEntry_noDrop()367 public void roundRobinPickerWithIdleEntry_noDrop() { 368 Subchannel subchannel = mock(Subchannel.class); 369 IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext); 370 371 RoundRobinPicker picker = 372 new RoundRobinPicker(Collections.<DropEntry>emptyList(), Collections.singletonList(entry)); 373 PickSubchannelArgs args = mock(PickSubchannelArgs.class); 374 375 verify(subchannel, never()).requestConnection(); 376 assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult()); 377 verify(subchannel).requestConnection(); 378 assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult()); 379 // Only the first pick triggers requestConnection() 380 verify(subchannel).requestConnection(); 381 } 382 383 @Test roundRobinPickerWithIdleEntry_andDrop()384 public void roundRobinPickerWithIdleEntry_andDrop() { 385 GrpclbClientLoadRecorder loadRecorder = 386 new GrpclbClientLoadRecorder(fakeClock.getTimeProvider()); 387 // 1 out of 2 requests are to be dropped 388 DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003"); 389 List<DropEntry> dropList = Arrays.asList(null, d); 390 391 Subchannel subchannel = mock(Subchannel.class); 392 IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext); 393 394 RoundRobinPicker picker = new RoundRobinPicker(dropList, Collections.singletonList(entry)); 395 PickSubchannelArgs args = mock(PickSubchannelArgs.class); 396 397 verify(subchannel, never()).requestConnection(); 398 assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult()); 399 verify(subchannel).requestConnection(); 400 401 assertThat(picker.pickSubchannel(args)).isSameInstanceAs(DROP_PICK_RESULT); 402 403 verify(subchannel).requestConnection(); 404 assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult()); 405 // Only the first pick triggers requestConnection() 406 verify(subchannel).requestConnection(); 407 } 408 409 @Test loadReporting()410 public void loadReporting() { 411 Metadata headers = new Metadata(); 412 PickSubchannelArgs args = mock(PickSubchannelArgs.class); 413 when(args.getHeaders()).thenReturn(headers); 414 415 long loadReportIntervalMillis = 1983; 416 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 417 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList); 418 419 // Fallback timer is started as soon as address is resolved. 420 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 421 422 assertEquals(1, fakeOobChannels.size()); 423 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 424 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 425 assertEquals(1, lbRequestObservers.size()); 426 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 427 InOrder inOrder = inOrder(lbRequestObserver); 428 InOrder helperInOrder = inOrder(helper, subchannelPool); 429 430 inOrder.verify(lbRequestObserver).onNext( 431 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 432 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 433 .build())); 434 435 // Simulate receiving LB response 436 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 437 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 438 439 // Load reporting task is scheduled 440 assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 441 assertEquals(0, fakeClock.runDueTasks()); 442 443 List<ServerEntry> backends = Arrays.asList( 444 new ServerEntry("127.0.0.1", 2000, "token0001"), 445 new ServerEntry("token0001"), // drop 446 new ServerEntry("127.0.0.1", 2010, "token0002"), 447 new ServerEntry("token0003")); // drop 448 449 lbResponseObserver.onNext(buildLbResponse(backends)); 450 451 assertEquals(2, mockSubchannels.size()); 452 Subchannel subchannel1 = mockSubchannels.poll(); 453 Subchannel subchannel2 = mockSubchannels.poll(); 454 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); 455 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); 456 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); 457 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); 458 459 helperInOrder.verify(helper, atLeast(1)) 460 .updateBalancingState(eq(READY), pickerCaptor.capture()); 461 RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); 462 assertThat(picker.dropList).containsExactly( 463 null, 464 new DropEntry(getLoadRecorder(), "token0001"), 465 null, 466 new DropEntry(getLoadRecorder(), "token0003")).inOrder(); 467 assertThat(picker.pickList).containsExactly( 468 new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), 469 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")).inOrder(); 470 471 // Report, no data 472 assertNextReport( 473 inOrder, lbRequestObserver, loadReportIntervalMillis, 474 ClientStats.newBuilder().build()); 475 476 PickResult pick1 = picker.pickSubchannel(args); 477 assertSame(subchannel1, pick1.getSubchannel()); 478 assertSame(getLoadRecorder(), pick1.getStreamTracerFactory()); 479 480 // Merely the pick will not be recorded as upstart. 481 assertNextReport( 482 inOrder, lbRequestObserver, loadReportIntervalMillis, 483 ClientStats.newBuilder().build()); 484 485 ClientStreamTracer tracer1 = 486 pick1.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); 487 tracer1.streamCreated(Attributes.EMPTY, new Metadata()); 488 489 PickResult pick2 = picker.pickSubchannel(args); 490 assertNull(pick2.getSubchannel()); 491 assertSame(DROP_PICK_RESULT, pick2); 492 493 // Report includes upstart of pick1 and the drop of pick2 494 assertNextReport( 495 inOrder, lbRequestObserver, loadReportIntervalMillis, 496 ClientStats.newBuilder() 497 .setNumCallsStarted(2) 498 .setNumCallsFinished(1) // pick2 499 .addCallsFinishedWithDrop( 500 ClientStatsPerToken.newBuilder() 501 .setLoadBalanceToken("token0001") 502 .setNumCalls(1) // pick2 503 .build()) 504 .build()); 505 506 PickResult pick3 = picker.pickSubchannel(args); 507 assertSame(subchannel2, pick3.getSubchannel()); 508 assertSame(getLoadRecorder(), pick3.getStreamTracerFactory()); 509 ClientStreamTracer tracer3 = 510 pick3.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); 511 tracer3.streamCreated(Attributes.EMPTY, new Metadata()); 512 513 // pick3 has sent out headers 514 tracer3.outboundHeaders(); 515 516 // 3rd report includes pick3's upstart 517 assertNextReport( 518 inOrder, lbRequestObserver, loadReportIntervalMillis, 519 ClientStats.newBuilder() 520 .setNumCallsStarted(1) 521 .build()); 522 523 PickResult pick4 = picker.pickSubchannel(args); 524 assertNull(pick4.getSubchannel()); 525 assertSame(DROP_PICK_RESULT, pick4); 526 527 // pick1 ended without sending anything 528 tracer1.streamClosed(Status.CANCELLED); 529 530 // 4th report includes end of pick1 and drop of pick4 531 assertNextReport( 532 inOrder, lbRequestObserver, loadReportIntervalMillis, 533 ClientStats.newBuilder() 534 .setNumCallsStarted(1) // pick4 535 .setNumCallsFinished(2) 536 .setNumCallsFinishedWithClientFailedToSend(1) // pick1 537 .addCallsFinishedWithDrop( 538 ClientStatsPerToken.newBuilder() 539 .setLoadBalanceToken("token0003") 540 .setNumCalls(1) // pick4 541 .build()) 542 .build()); 543 544 PickResult pick5 = picker.pickSubchannel(args); 545 assertSame(subchannel1, pick1.getSubchannel()); 546 assertSame(getLoadRecorder(), pick5.getStreamTracerFactory()); 547 ClientStreamTracer tracer5 = 548 pick5.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); 549 tracer5.streamCreated(Attributes.EMPTY, new Metadata()); 550 551 // pick3 ended without receiving response headers 552 tracer3.streamClosed(Status.DEADLINE_EXCEEDED); 553 554 // pick5 sent and received headers 555 tracer5.outboundHeaders(); 556 tracer5.inboundHeaders(); 557 558 // 5th report includes pick3's end and pick5's upstart 559 assertNextReport( 560 inOrder, lbRequestObserver, loadReportIntervalMillis, 561 ClientStats.newBuilder() 562 .setNumCallsStarted(1) // pick5 563 .setNumCallsFinished(1) // pick3 564 .build()); 565 566 // pick5 ends 567 tracer5.streamClosed(Status.OK); 568 569 // 6th report includes pick5's end 570 assertNextReport( 571 inOrder, lbRequestObserver, loadReportIntervalMillis, 572 ClientStats.newBuilder() 573 .setNumCallsFinished(1) 574 .setNumCallsFinishedKnownReceived(1) 575 .build()); 576 577 assertEquals(1, fakeClock.numPendingTasks()); 578 // Balancer closes the stream, scheduled reporting task cancelled 579 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 580 assertEquals(0, fakeClock.numPendingTasks()); 581 582 // New stream created 583 verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); 584 lbResponseObserver = lbResponseObserverCaptor.getValue(); 585 assertEquals(1, lbRequestObservers.size()); 586 lbRequestObserver = lbRequestObservers.poll(); 587 inOrder = inOrder(lbRequestObserver); 588 589 inOrder.verify(lbRequestObserver).onNext( 590 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 591 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 592 .build())); 593 594 // Load reporting is also requested 595 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 596 597 // No picker created because balancer is still using the results from the last stream 598 helperInOrder.verify(helper, never()) 599 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 600 601 // Make a new pick on that picker. It will not show up on the report of the new stream, because 602 // that picker is associated with the previous stream. 603 PickResult pick6 = picker.pickSubchannel(args); 604 assertNull(pick6.getSubchannel()); 605 assertSame(DROP_PICK_RESULT, pick6); 606 assertNextReport( 607 inOrder, lbRequestObserver, loadReportIntervalMillis, 608 ClientStats.newBuilder().build()); 609 610 // New stream got the list update 611 lbResponseObserver.onNext(buildLbResponse(backends)); 612 613 // Same backends, thus no new subchannels 614 helperInOrder.verify(subchannelPool, never()).takeOrCreateSubchannel( 615 any(EquivalentAddressGroup.class), any(Attributes.class)); 616 // But the new RoundRobinEntries have a new loadRecorder, thus considered different from 617 // the previous list, thus a new picker is created 618 helperInOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 619 picker = (RoundRobinPicker) pickerCaptor.getValue(); 620 621 PickResult pick1p = picker.pickSubchannel(args); 622 assertSame(subchannel1, pick1p.getSubchannel()); 623 assertSame(getLoadRecorder(), pick1p.getStreamTracerFactory()); 624 pick1p.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata()); 625 626 // The pick from the new stream will be included in the report 627 assertNextReport( 628 inOrder, lbRequestObserver, loadReportIntervalMillis, 629 ClientStats.newBuilder() 630 .setNumCallsStarted(1) 631 .build()); 632 633 verify(args, atLeast(0)).getHeaders(); 634 verifyNoMoreInteractions(args); 635 } 636 637 @Test abundantInitialResponse()638 public void abundantInitialResponse() { 639 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 640 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList); 641 assertEquals(1, fakeOobChannels.size()); 642 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 643 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 644 645 // Simulate LB initial response 646 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 647 lbResponseObserver.onNext(buildInitialResponse(1983)); 648 649 // Load reporting task is scheduled 650 assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 651 FakeClock.ScheduledTask scheduledTask = 652 Iterables.getOnlyElement(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER)); 653 assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); 654 655 logs.clear(); 656 // Simulate an abundant LB initial response, with a different report interval 657 lbResponseObserver.onNext(buildInitialResponse(9097)); 658 659 // This incident is logged 660 assertThat(logs).containsExactly( 661 "DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildInitialResponse(9097), 662 "WARNING: [grpclb-<api.google.com>] " 663 + "Ignoring unexpected response type: INITIAL_RESPONSE") 664 .inOrder(); 665 666 // It doesn't affect load-reporting at all 667 assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER)) 668 .containsExactly(scheduledTask); 669 assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); 670 } 671 672 @SuppressWarnings("unchecked") 673 @Test raceBetweenHandleAddressesAndLbStreamClosure()674 public void raceBetweenHandleAddressesAndLbStreamClosure() { 675 InOrder inOrder = inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1); 676 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), 677 createResolvedBalancerAddresses(1)); 678 assertEquals(1, fakeOobChannels.size()); 679 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 680 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 681 assertEquals(1, lbRequestObservers.size()); 682 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 683 verify(lbRequestObserver).onNext( 684 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 685 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 686 .build())); 687 688 // Close lbStream 689 lbResponseObserver.onCompleted(); 690 inOrder.verify(backoffPolicyProvider).get(); 691 inOrder.verify(backoffPolicy1).nextBackoffNanos(); 692 // Retry task scheduled 693 assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 694 FakeClock.ScheduledTask retryTask = 695 Iterables.getOnlyElement(fakeClock.getPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 696 assertEquals(10L, retryTask.getDelay(TimeUnit.NANOSECONDS)); 697 698 // Receive the same Lb address again 699 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), 700 createResolvedBalancerAddresses(1)); 701 // Retry task cancelled 702 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 703 // Reuse the existing OOB channel 704 assertEquals(1, fakeOobChannels.size()); 705 // Start a new LoadBalance RPC 706 inOrder.verify(mockLbService).balanceLoad(any(StreamObserver.class)); 707 assertEquals(1, lbRequestObservers.size()); 708 lbRequestObserver = lbRequestObservers.poll(); 709 verify(lbRequestObserver).onNext( 710 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 711 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 712 .build())); 713 714 // Simulate a race condition where the task has just started when it's cancelled 715 retryTask.command.run(); 716 inOrder.verifyNoMoreInteractions(); 717 } 718 719 @Test raceBetweenLoadReportingAndLbStreamClosure()720 public void raceBetweenLoadReportingAndLbStreamClosure() { 721 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 722 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList); 723 assertEquals(1, fakeOobChannels.size()); 724 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 725 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 726 assertEquals(1, lbRequestObservers.size()); 727 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 728 InOrder inOrder = inOrder(lbRequestObserver); 729 730 inOrder.verify(lbRequestObserver).onNext( 731 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 732 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 733 .build())); 734 735 // Simulate receiving LB response 736 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 737 lbResponseObserver.onNext(buildInitialResponse(1983)); 738 // Load reporting task is scheduled 739 assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 740 FakeClock.ScheduledTask scheduledTask = 741 Iterables.getOnlyElement(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER)); 742 assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); 743 744 // Close lbStream 745 lbResponseObserver.onCompleted(); 746 747 // Reporting task cancelled 748 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 749 750 // Simulate a race condition where the task has just started when its cancelled 751 scheduledTask.command.run(); 752 753 // No report sent. No new task scheduled 754 inOrder.verify(lbRequestObserver, never()).onNext(any(LoadBalanceRequest.class)); 755 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 756 } 757 assertNextReport( InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver, long loadReportIntervalMillis, ClientStats expectedReport)758 private void assertNextReport( 759 InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver, 760 long loadReportIntervalMillis, ClientStats expectedReport) { 761 assertEquals(0, fakeClock.forwardTime(loadReportIntervalMillis - 1, TimeUnit.MILLISECONDS)); 762 inOrder.verifyNoMoreInteractions(); 763 assertEquals(1, fakeClock.forwardTime(1, TimeUnit.MILLISECONDS)); 764 assertEquals(1, fakeClock.numPendingTasks()); 765 inOrder.verify(lbRequestObserver).onNext( 766 eq(LoadBalanceRequest.newBuilder() 767 .setClientStats( 768 ClientStats.newBuilder(expectedReport) 769 .setTimestamp(Timestamps.fromNanos(fakeClock.getTicker().read())) 770 .build()) 771 .build())); 772 } 773 774 @Test receiveNoBackendAndBalancerAddress()775 public void receiveNoBackendAndBalancerAddress() { 776 deliverResolvedAddresses( 777 Collections.<EquivalentAddressGroup>emptyList(), 778 Collections.<EquivalentAddressGroup>emptyList()); 779 verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 780 RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); 781 assertThat(picker.dropList).isEmpty(); 782 Status error = Iterables.getOnlyElement(picker.pickList).picked(new Metadata()).getStatus(); 783 assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); 784 assertThat(error.getDescription()).isEqualTo("No backend or balancer addresses found"); 785 } 786 787 @Test nameResolutionFailsThenRecover()788 public void nameResolutionFailsThenRecover() { 789 Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); 790 deliverNameResolutionError(error); 791 verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 792 assertThat(logs) 793 .containsExactly( 794 "INFO: [grpclb-<api.google.com>] Created", 795 "DEBUG: [grpclb-<api.google.com>] Error: " + error) 796 .inOrder(); 797 logs.clear(); 798 799 RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); 800 assertThat(picker.dropList).isEmpty(); 801 PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); 802 assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); 803 assertThat(result.getStatus().getDescription()).isEqualTo(error.getDescription()); 804 805 // Recover with a subsequent success 806 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 807 808 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList); 809 810 verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), 811 eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); 812 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 813 } 814 815 @Test grpclbThenNameResolutionFails()816 public void grpclbThenNameResolutionFails() { 817 InOrder inOrder = inOrder(helper, subchannelPool); 818 // Go to GRPCLB first 819 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 820 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList); 821 822 verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), 823 eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); 824 assertEquals(1, fakeOobChannels.size()); 825 ManagedChannel oobChannel = fakeOobChannels.poll(); 826 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 827 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 828 829 // Let name resolution fail before round-robin list is ready 830 Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); 831 deliverNameResolutionError(error); 832 833 inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 834 RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); 835 assertThat(picker.dropList).isEmpty(); 836 PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); 837 assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); 838 assertThat(result.getStatus().getDescription()).isEqualTo(error.getDescription()); 839 assertFalse(oobChannel.isShutdown()); 840 841 // Simulate receiving LB response 842 List<ServerEntry> backends = Arrays.asList( 843 new ServerEntry("127.0.0.1", 2000, "TOKEN1"), 844 new ServerEntry("127.0.0.1", 2010, "TOKEN2")); 845 lbResponseObserver.onNext(buildInitialResponse()); 846 lbResponseObserver.onNext(buildLbResponse(backends)); 847 848 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 849 eq(new EquivalentAddressGroup(backends.get(0).addr, LB_BACKEND_ATTRS)), 850 any(Attributes.class)); 851 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 852 eq(new EquivalentAddressGroup(backends.get(1).addr, LB_BACKEND_ATTRS)), 853 any(Attributes.class)); 854 } 855 856 @Test grpclbUpdatedAddresses_avoidsReconnect()857 public void grpclbUpdatedAddresses_avoidsReconnect() { 858 List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(1); 859 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 860 deliverResolvedAddresses(backendList, grpclbBalancerList); 861 862 verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), 863 eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); 864 ManagedChannel oobChannel = fakeOobChannels.poll(); 865 assertEquals(1, lbRequestObservers.size()); 866 867 List<EquivalentAddressGroup> backendList2 = createResolvedBackendAddresses(1); 868 List<EquivalentAddressGroup> grpclbBalancerList2 = createResolvedBalancerAddresses(2); 869 deliverResolvedAddresses(backendList2, grpclbBalancerList2); 870 verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(xattr(grpclbBalancerList2))); 871 assertEquals(1, lbRequestObservers.size()); // No additional RPC 872 } 873 874 875 @Test grpclbUpdatedAddresses_reconnectOnAuthorityChange()876 public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() { 877 List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(1); 878 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 879 deliverResolvedAddresses(backendList, grpclbBalancerList); 880 881 verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), 882 eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); 883 ManagedChannel oobChannel = fakeOobChannels.poll(); 884 assertEquals(1, lbRequestObservers.size()); 885 886 final String newAuthority = "some-new-authority"; 887 List<EquivalentAddressGroup> backendList2 = createResolvedBackendAddresses(1); 888 List<EquivalentAddressGroup> grpclbBalancerList2 = 889 Collections.singletonList( 890 new EquivalentAddressGroup( 891 new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority))); 892 deliverResolvedAddresses(backendList2, grpclbBalancerList2); 893 verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(xattr(grpclbBalancerList2))); 894 assertEquals(1, lbRequestObservers.size()); // No additional RPC 895 } 896 897 @Test grpclbWorking()898 public void grpclbWorking() { 899 InOrder inOrder = inOrder(helper, subchannelPool); 900 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 901 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList); 902 903 // Fallback timer is started as soon as the addresses are resolved. 904 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 905 906 verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), 907 eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); 908 assertEquals(1, fakeOobChannels.size()); 909 ManagedChannel oobChannel = fakeOobChannels.poll(); 910 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 911 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 912 assertEquals(1, lbRequestObservers.size()); 913 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 914 verify(lbRequestObserver).onNext( 915 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 916 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 917 .build())); 918 919 // Simulate receiving LB response 920 List<ServerEntry> backends1 = Arrays.asList( 921 new ServerEntry("127.0.0.1", 2000, "token0001"), 922 new ServerEntry("127.0.0.1", 2010, "token0002")); 923 inOrder.verify(helper, never()) 924 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 925 logs.clear(); 926 lbResponseObserver.onNext(buildInitialResponse()); 927 assertThat(logs).containsExactly( 928 "INFO: [grpclb-<api.google.com>] Got an LB initial response: " + buildInitialResponse()); 929 logs.clear(); 930 lbResponseObserver.onNext(buildLbResponse(backends1)); 931 932 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 933 eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), 934 any(Attributes.class)); 935 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 936 eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), 937 any(Attributes.class)); 938 assertEquals(2, mockSubchannels.size()); 939 Subchannel subchannel1 = mockSubchannels.poll(); 940 Subchannel subchannel2 = mockSubchannels.poll(); 941 verify(subchannel1).requestConnection(); 942 verify(subchannel2).requestConnection(); 943 assertEquals( 944 new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS), 945 subchannel1.getAddresses()); 946 assertEquals( 947 new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS), 948 subchannel2.getAddresses()); 949 950 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); 951 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); 952 953 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 954 RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); 955 assertThat(picker0.dropList).containsExactly(null, null); 956 assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY); 957 inOrder.verifyNoMoreInteractions(); 958 959 assertThat(logs).containsExactly( 960 "DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildLbResponse(backends1)) 961 .inOrder(); 962 logs.clear(); 963 964 // Let subchannels be connected 965 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); 966 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 967 968 RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); 969 970 assertThat(picker1.dropList).containsExactly(null, null); 971 assertThat(picker1.pickList).containsExactly( 972 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); 973 974 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); 975 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 976 977 RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); 978 assertThat(picker2.dropList).containsExactly(null, null); 979 assertThat(picker2.pickList).containsExactly( 980 new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), 981 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) 982 .inOrder(); 983 984 // Disconnected subchannels 985 verify(subchannel1).requestConnection(); 986 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE)); 987 verify(subchannel1, times(2)).requestConnection(); 988 inOrder.verify(helper).refreshNameResolution(); 989 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 990 991 RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); 992 assertThat(picker3.dropList).containsExactly(null, null); 993 assertThat(picker3.pickList).containsExactly( 994 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); 995 996 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); 997 inOrder.verifyNoMoreInteractions(); 998 999 // As long as there is at least one READY subchannel, round robin will work. 1000 ConnectivityStateInfo errorState1 = 1001 ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription("error1")); 1002 deliverSubchannelState(subchannel1, errorState1); 1003 inOrder.verify(helper).refreshNameResolution(); 1004 inOrder.verifyNoMoreInteractions(); 1005 1006 // If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING 1007 verify(subchannel2).requestConnection(); 1008 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE)); 1009 verify(subchannel2, times(2)).requestConnection(); 1010 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 1011 1012 RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); 1013 assertThat(picker4.dropList).containsExactly(null, null); 1014 assertThat(picker4.pickList).containsExactly(BUFFER_ENTRY); 1015 1016 // Update backends, with a drop entry 1017 List<ServerEntry> backends2 = 1018 Arrays.asList( 1019 new ServerEntry("127.0.0.1", 2030, "token0003"), // New address 1020 new ServerEntry("token0003"), // drop 1021 new ServerEntry("127.0.0.1", 2010, "token0004"), // Existing address with token changed 1022 new ServerEntry("127.0.0.1", 2030, "token0005"), // New address appearing second time 1023 new ServerEntry("token0006")); // drop 1024 verify(subchannelPool, never()) 1025 .returnSubchannel(same(subchannel1), any(ConnectivityStateInfo.class)); 1026 1027 lbResponseObserver.onNext(buildLbResponse(backends2)); 1028 assertThat(logs).containsExactly( 1029 "DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildLbResponse(backends2)) 1030 .inOrder(); 1031 logs.clear(); 1032 1033 // not in backends2, closed 1034 verify(subchannelPool).returnSubchannel(same(subchannel1), same(errorState1)); 1035 // backends2[2], will be kept 1036 verify(subchannelPool, never()) 1037 .returnSubchannel(same(subchannel2), any(ConnectivityStateInfo.class)); 1038 1039 inOrder.verify(subchannelPool, never()).takeOrCreateSubchannel( 1040 eq(new EquivalentAddressGroup(backends2.get(2).addr, LB_BACKEND_ATTRS)), 1041 any(Attributes.class)); 1042 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 1043 eq(new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS)), 1044 any(Attributes.class)); 1045 1046 ConnectivityStateInfo errorOnCachedSubchannel1 = 1047 ConnectivityStateInfo.forTransientFailure( 1048 Status.UNAVAILABLE.withDescription("You can get this error even if you are cached")); 1049 deliverSubchannelState(subchannel1, errorOnCachedSubchannel1); 1050 1051 assertEquals(1, mockSubchannels.size()); 1052 Subchannel subchannel3 = mockSubchannels.poll(); 1053 verify(subchannel3).requestConnection(); 1054 assertEquals( 1055 new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS), 1056 subchannel3.getAddresses()); 1057 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 1058 RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue(); 1059 assertThat(picker7.dropList).containsExactly( 1060 null, 1061 new DropEntry(getLoadRecorder(), "token0003"), 1062 null, 1063 null, 1064 new DropEntry(getLoadRecorder(), "token0006")).inOrder(); 1065 assertThat(picker7.pickList).containsExactly(BUFFER_ENTRY); 1066 1067 // State updates on obsolete subchannel1 will only be passed to the pool 1068 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); 1069 deliverSubchannelState( 1070 subchannel1, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); 1071 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN)); 1072 1073 deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); 1074 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 1075 RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue(); 1076 assertThat(picker8.dropList).containsExactly( 1077 null, 1078 new DropEntry(getLoadRecorder(), "token0003"), 1079 null, 1080 null, 1081 new DropEntry(getLoadRecorder(), "token0006")).inOrder(); 1082 // subchannel2 is still IDLE, thus not in the active list 1083 assertThat(picker8.pickList).containsExactly( 1084 new BackendEntry(subchannel3, getLoadRecorder(), "token0003"), 1085 new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder(); 1086 // subchannel2 becomes READY and makes it into the list 1087 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); 1088 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 1089 RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue(); 1090 assertThat(picker9.dropList).containsExactly( 1091 null, 1092 new DropEntry(getLoadRecorder(), "token0003"), 1093 null, 1094 null, 1095 new DropEntry(getLoadRecorder(), "token0006")).inOrder(); 1096 assertThat(picker9.pickList).containsExactly( 1097 new BackendEntry(subchannel3, getLoadRecorder(), "token0003"), 1098 new BackendEntry(subchannel2, getLoadRecorder(), "token0004"), 1099 new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder(); 1100 verify(subchannelPool, never()) 1101 .returnSubchannel(same(subchannel3), any(ConnectivityStateInfo.class)); 1102 1103 // Update backends, with no entry 1104 lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList())); 1105 verify(subchannelPool) 1106 .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(READY))); 1107 verify(subchannelPool) 1108 .returnSubchannel(same(subchannel3), eq(ConnectivityStateInfo.forNonError(READY))); 1109 inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 1110 RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue(); 1111 assertThat(picker10.dropList).isEmpty(); 1112 assertThat(picker10.pickList) 1113 .containsExactly(new ErrorEntry(GrpclbState.NO_AVAILABLE_BACKENDS_STATUS)); 1114 1115 assertFalse(oobChannel.isShutdown()); 1116 assertEquals(0, lbRequestObservers.size()); 1117 verify(lbRequestObserver, never()).onCompleted(); 1118 verify(lbRequestObserver, never()).onError(any(Throwable.class)); 1119 1120 // Load reporting was not requested, thus never scheduled 1121 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 1122 1123 verify(subchannelPool, never()).clear(); 1124 balancer.shutdown(); 1125 verify(subchannelPool).clear(); 1126 } 1127 1128 @Test roundRobinMode_subchannelStayTransientFailureUntilReady()1129 public void roundRobinMode_subchannelStayTransientFailureUntilReady() { 1130 InOrder inOrder = inOrder(helper); 1131 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 1132 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList); 1133 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1134 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1135 1136 // Simulate receiving LB response 1137 List<ServerEntry> backends1 = Arrays.asList( 1138 new ServerEntry("127.0.0.1", 2000, "token0001"), 1139 new ServerEntry("127.0.0.1", 2010, "token0002")); 1140 lbResponseObserver.onNext(buildInitialResponse()); 1141 lbResponseObserver.onNext(buildLbResponse(backends1)); 1142 assertEquals(2, mockSubchannels.size()); 1143 Subchannel subchannel1 = mockSubchannels.poll(); 1144 Subchannel subchannel2 = mockSubchannels.poll(); 1145 1146 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); 1147 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); 1148 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); 1149 1150 // Switch all subchannels to TRANSIENT_FAILURE, making the general state TRANSIENT_FAILURE too. 1151 Status error = Status.UNAVAILABLE.withDescription("error"); 1152 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error)); 1153 inOrder.verify(helper).refreshNameResolution(); 1154 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forTransientFailure(error)); 1155 inOrder.verify(helper).refreshNameResolution(); 1156 inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 1157 assertThat(((RoundRobinPicker) pickerCaptor.getValue()).pickList) 1158 .containsExactly(new ErrorEntry(error)); 1159 1160 // Switch subchannel1 to IDLE, then to CONNECTING, which are ignored since the previous 1161 // subchannel state is TRANSIENT_FAILURE. General state is unchanged. 1162 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE)); 1163 inOrder.verify(helper).refreshNameResolution(); 1164 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); 1165 inOrder.verifyNoMoreInteractions(); 1166 1167 // Switch subchannel1 to READY, which will affect the general state 1168 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); 1169 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 1170 assertThat(((RoundRobinPicker) pickerCaptor.getValue()).pickList) 1171 .containsExactly(new BackendEntry(subchannel1, getLoadRecorder(), "token0001")); 1172 inOrder.verifyNoMoreInteractions(); 1173 } 1174 1175 @Test grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires()1176 public void grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires() { 1177 subtestGrpclbFallbackTimeout(false, GrpclbState.FALLBACK_TIMEOUT_MS); 1178 } 1179 1180 @Test grpclbFallback_initialTimeout_timerExpires()1181 public void grpclbFallback_initialTimeout_timerExpires() { 1182 subtestGrpclbFallbackTimeout(true, GrpclbState.FALLBACK_TIMEOUT_MS); 1183 } 1184 1185 @Test grpclbFallback_timeout_serverListReceivedBeforeTimerExpires()1186 public void grpclbFallback_timeout_serverListReceivedBeforeTimerExpires() { 1187 subtestGrpclbFallbackTimeout(false, 12345); 1188 } 1189 1190 @Test grpclbFallback_timeout_timerExpires()1191 public void grpclbFallback_timeout_timerExpires() { 1192 subtestGrpclbFallbackTimeout(true, 12345); 1193 } 1194 1195 // Fallback or not within the period of the initial timeout. subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout)1196 private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) { 1197 long loadReportIntervalMillis = 1983; 1198 InOrder inOrder = inOrder(helper, subchannelPool); 1199 1200 // Create balancer and backend addresses 1201 List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2); 1202 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 1203 deliverResolvedAddresses( 1204 backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); 1205 inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), 1206 eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); 1207 1208 // Attempted to connect to balancer 1209 assertEquals(1, fakeOobChannels.size()); 1210 ManagedChannel oobChannel = fakeOobChannels.poll(); 1211 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1212 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1213 assertEquals(1, lbRequestObservers.size()); 1214 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 1215 1216 verify(lbRequestObserver).onNext( 1217 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1218 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1219 .build())); 1220 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 1221 // We don't care if these methods have been run. 1222 inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); 1223 inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); 1224 1225 inOrder.verifyNoMoreInteractions(); 1226 1227 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1228 fakeClock.forwardTime(timeout - 1, TimeUnit.MILLISECONDS); 1229 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1230 1231 ////////////////////////////////// 1232 // Fallback timer expires (or not) 1233 ////////////////////////////////// 1234 if (timerExpires) { 1235 logs.clear(); 1236 fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); 1237 1238 assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1239 assertThat(logs) 1240 .containsExactly("INFO: [grpclb-<api.google.com>] Using fallback backends") 1241 .inOrder(); 1242 1243 // Fall back to the backends from resolver 1244 fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList); 1245 1246 assertFalse(oobChannel.isShutdown()); 1247 verify(lbRequestObserver, never()).onCompleted(); 1248 } 1249 1250 ////////////////////////////////////////////////////////////////////// 1251 // Name resolver sends new resolution results without any backend addr 1252 ////////////////////////////////////////////////////////////////////// 1253 grpclbBalancerList = createResolvedBalancerAddresses(2); 1254 deliverResolvedAddresses( 1255 Collections.<EquivalentAddressGroup>emptyList(), 1256 grpclbBalancerList, 1257 GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); 1258 1259 // New addresses are updated to the OobChannel 1260 inOrder.verify(helper).updateOobChannelAddresses( 1261 same(oobChannel), eq(xattr(grpclbBalancerList))); 1262 1263 if (timerExpires) { 1264 // Still in fallback logic, except that the backend list is empty 1265 for (Subchannel subchannel : mockSubchannels) { 1266 verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); 1267 } 1268 1269 // RPC error status includes message of balancer RPC timeout 1270 inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 1271 PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); 1272 assertThat(result.getStatus().getCode()) 1273 .isEqualTo(Code.UNAVAILABLE); 1274 assertThat(result.getStatus().getDescription()) 1275 .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); 1276 assertThat(result.getStatus().getDescription()) 1277 .contains(GrpclbState.BALANCER_TIMEOUT_STATUS.getDescription()); 1278 } 1279 1280 //////////////////////////////////////////////////////////////// 1281 // Name resolver sends new resolution results with backend addrs 1282 //////////////////////////////////////////////////////////////// 1283 // prevents the cached subchannel to be used 1284 subchannelPool.clear(); 1285 backendList = createResolvedBackendAddresses(2); 1286 grpclbBalancerList = createResolvedBalancerAddresses(1); 1287 deliverResolvedAddresses( 1288 backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); 1289 1290 // New LB address is updated to the OobChannel 1291 inOrder.verify(helper).updateOobChannelAddresses( 1292 same(oobChannel), eq(xattr(grpclbBalancerList))); 1293 1294 if (timerExpires) { 1295 // New backend addresses are used for fallback 1296 fallbackTestVerifyUseOfFallbackBackendLists( 1297 inOrder, Arrays.asList(backendList.get(0), backendList.get(1))); 1298 } 1299 1300 //////////////////////////////////////////////// 1301 // Break the LB stream after the timer expires 1302 //////////////////////////////////////////////// 1303 if (timerExpires) { 1304 Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); 1305 lbResponseObserver.onError(streamError.asException()); 1306 1307 // The error will NOT propagate to picker because fallback list is in use. 1308 inOrder.verify(helper, never()) 1309 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1310 // A new stream is created 1311 verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); 1312 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1313 assertEquals(1, lbRequestObservers.size()); 1314 lbRequestObserver = lbRequestObservers.poll(); 1315 verify(lbRequestObserver).onNext( 1316 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1317 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1318 .build())); 1319 } 1320 1321 ///////////////////////////////// 1322 // Balancer returns a server list 1323 ///////////////////////////////// 1324 List<ServerEntry> serverList = Arrays.asList( 1325 new ServerEntry("127.0.0.1", 2000, "token0001"), 1326 new ServerEntry("127.0.0.1", 2010, "token0002")); 1327 lbResponseObserver.onNext(buildInitialResponse()); 1328 lbResponseObserver.onNext(buildLbResponse(serverList)); 1329 1330 // Balancer-provided server list now in effect 1331 fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList); 1332 1333 /////////////////////////////////////////////////////////////// 1334 // New backend addresses from resolver outside of fallback mode 1335 /////////////////////////////////////////////////////////////// 1336 backendList = createResolvedBackendAddresses(1); 1337 grpclbBalancerList = createResolvedBalancerAddresses(1); 1338 deliverResolvedAddresses( 1339 backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout)); 1340 // Will not affect the round robin list at all 1341 inOrder.verify(helper, never()) 1342 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1343 1344 // No fallback timeout timer scheduled. 1345 assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1346 } 1347 1348 @Test grpclbFallback_breakLbStreamBeforeFallbackTimerExpires()1349 public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { 1350 long loadReportIntervalMillis = 1983; 1351 InOrder inOrder = inOrder(helper, subchannelPool); 1352 1353 // Create balancer and backend addresses 1354 List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2); 1355 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 1356 deliverResolvedAddresses(backendList, grpclbBalancerList); 1357 1358 inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), 1359 eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); 1360 1361 // Attempted to connect to balancer 1362 assertThat(fakeOobChannels).hasSize(1); 1363 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1364 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1365 assertThat(lbRequestObservers).hasSize(1); 1366 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 1367 1368 verify(lbRequestObserver).onNext( 1369 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1370 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1371 .build())); 1372 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 1373 // We don't care if these methods have been run. 1374 inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); 1375 inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); 1376 1377 inOrder.verifyNoMoreInteractions(); 1378 1379 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1380 1381 ///////////////////////////////////////////// 1382 // Break the LB stream before timer expires 1383 ///////////////////////////////////////////// 1384 Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); 1385 lbResponseObserver.onError(streamError.asException()); 1386 1387 // Fallback time has been short-circuited 1388 assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1389 1390 // Fall back to the backends from resolver 1391 fallbackTestVerifyUseOfFallbackBackendLists( 1392 inOrder, Arrays.asList(backendList.get(0), backendList.get(1))); 1393 1394 // A new stream is created 1395 verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); 1396 assertThat(lbRequestObservers).hasSize(1); 1397 lbRequestObserver = lbRequestObservers.poll(); 1398 verify(lbRequestObserver).onNext( 1399 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1400 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1401 .build())); 1402 1403 ////////////////////////////////////////////////////////////////////// 1404 // Name resolver sends new resolution results without any backend addr 1405 ////////////////////////////////////////////////////////////////////// 1406 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList); 1407 1408 // Still in fallback logic, except that the backend list is empty 1409 for (Subchannel subchannel : mockSubchannels) { 1410 verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); 1411 } 1412 1413 // RPC error status includes error of balancer stream 1414 inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 1415 PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); 1416 assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); 1417 assertThat(result.getStatus().getDescription()) 1418 .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); 1419 assertThat(result.getStatus().getDescription()).contains(streamError.getDescription()); 1420 } 1421 1422 @Test grpclbFallback_noBalancerAddress()1423 public void grpclbFallback_noBalancerAddress() { 1424 InOrder inOrder = inOrder(helper, subchannelPool); 1425 1426 // Create 5 distinct backends 1427 List<EquivalentAddressGroup> backends = createResolvedBackendAddresses(5); 1428 1429 // Name resolver gives the first two backend addresses 1430 List<EquivalentAddressGroup> backendList = backends.subList(0, 2); 1431 deliverResolvedAddresses(backendList, Collections.<EquivalentAddressGroup>emptyList()); 1432 1433 assertThat(logs).contains("INFO: [grpclb-<api.google.com>] Using fallback backends"); 1434 1435 // Fall back to the backends from resolver 1436 fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList); 1437 1438 // No fallback timeout timer scheduled. 1439 assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1440 verify(helper, never()) 1441 .createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>anyList(), anyString()); 1442 logs.clear(); 1443 1444 ///////////////////////////////////////////////////////////////////////////////////////// 1445 // Name resolver sends new resolution results with new backend addr but no balancer addr 1446 ///////////////////////////////////////////////////////////////////////////////////////// 1447 // Name resolver then gives the last three backends 1448 backendList = backends.subList(2, 5); 1449 deliverResolvedAddresses(backendList, Collections.<EquivalentAddressGroup>emptyList()); 1450 1451 assertThat(logs).contains("INFO: [grpclb-<api.google.com>] Using fallback backends"); 1452 1453 // Shift to use updated backends 1454 fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList); 1455 logs.clear(); 1456 1457 /////////////////////////////////////////////////////////////////////////////////////// 1458 // Name resolver sends new resolution results without any backend addr or balancer addr 1459 /////////////////////////////////////////////////////////////////////////////////////// 1460 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), 1461 Collections.<EquivalentAddressGroup>emptyList()); 1462 assertThat(logs).containsExactly( 1463 "DEBUG: [grpclb-<api.google.com>] Error: Status{code=UNAVAILABLE, " 1464 + "description=No backend or balancer addresses found, cause=null}"); 1465 1466 // Keep using existing fallback addresses without interruption 1467 for (Subchannel subchannel : mockSubchannels) { 1468 verify(subchannelPool, never()) 1469 .returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); 1470 } 1471 verify(helper, never()) 1472 .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); 1473 } 1474 1475 /** 1476 * A test for a situation where we first only get backend addresses resolved and then in a 1477 * later name resolution get both backend and load balancer addresses. The first instance 1478 * will switch us to using fallback backends and it is important that in the second instance 1479 * we do not start a fallback timer as it will fail when it triggers if the fallback backends 1480 * are already in use. 1481 */ 1482 @Test grpclbFallback_noTimerWhenAlreadyInFallback()1483 public void grpclbFallback_noTimerWhenAlreadyInFallback() { 1484 // Initially we only get backend addresses without any LB ones. This should get us to use 1485 // fallback backends from the start as we won't be able to even talk to the load balancer. 1486 // No fallback timer would be started as we already started to use fallback backends. 1487 deliverResolvedAddresses(createResolvedBalancerAddresses(1), 1488 Collections.<EquivalentAddressGroup>emptyList()); 1489 assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1490 1491 // Later a new name resolution call happens and we get both backend and LB addresses. Since we 1492 // are already operating with fallback backends a fallback timer should not be started to move 1493 // us to fallback mode. 1494 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), 1495 createResolvedBalancerAddresses(1)); 1496 1497 // If a fallback timer is started it will eventually throw an exception when it tries to switch 1498 // us to using fallback backends when we already are using them. 1499 assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1500 } 1501 1502 @Test grpclbFallback_balancerLost()1503 public void grpclbFallback_balancerLost() { 1504 subtestGrpclbFallbackConnectionLost(true, false); 1505 } 1506 1507 @Test grpclbFallback_subchannelsLost()1508 public void grpclbFallback_subchannelsLost() { 1509 subtestGrpclbFallbackConnectionLost(false, true); 1510 } 1511 1512 @Test grpclbFallback_allLost()1513 public void grpclbFallback_allLost() { 1514 subtestGrpclbFallbackConnectionLost(true, true); 1515 } 1516 1517 // Fallback outside of the initial timeout, where all connections are lost. subtestGrpclbFallbackConnectionLost( boolean balancerBroken, boolean allSubchannelsBroken)1518 private void subtestGrpclbFallbackConnectionLost( 1519 boolean balancerBroken, boolean allSubchannelsBroken) { 1520 long loadReportIntervalMillis = 1983; 1521 InOrder inOrder = inOrder(helper, mockLbService, subchannelPool); 1522 1523 // Create balancer and backend addresses 1524 List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2); 1525 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 1526 deliverResolvedAddresses(backendList, grpclbBalancerList); 1527 1528 inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), 1529 eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); 1530 1531 // Attempted to connect to balancer 1532 assertEquals(1, fakeOobChannels.size()); 1533 fakeOobChannels.poll(); 1534 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1535 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1536 assertEquals(1, lbRequestObservers.size()); 1537 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 1538 1539 verify(lbRequestObserver).onNext( 1540 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1541 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1542 .build())); 1543 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 1544 // We don't care if these methods have been run. 1545 inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); 1546 inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); 1547 1548 inOrder.verifyNoMoreInteractions(); 1549 1550 // Balancer returns a server list 1551 List<ServerEntry> serverList = Arrays.asList( 1552 new ServerEntry("127.0.0.1", 2000, "token0001"), 1553 new ServerEntry("127.0.0.1", 2010, "token0002")); 1554 lbResponseObserver.onNext(buildInitialResponse()); 1555 lbResponseObserver.onNext(buildLbResponse(serverList)); 1556 1557 List<Subchannel> subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList); 1558 1559 // Break connections 1560 if (balancerBroken) { 1561 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 1562 // A new stream to LB is created 1563 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1564 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1565 assertEquals(1, lbRequestObservers.size()); 1566 lbRequestObserver = lbRequestObservers.poll(); 1567 inOrder.verify(helper).refreshNameResolution(); 1568 } 1569 if (allSubchannelsBroken) { 1570 for (Subchannel subchannel : subchannels) { 1571 // A READY subchannel transits to IDLE when receiving a go-away 1572 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); 1573 inOrder.verify(helper).refreshNameResolution(); 1574 } 1575 } 1576 1577 if (balancerBroken && allSubchannelsBroken) { 1578 // Going into fallback 1579 subchannels = fallbackTestVerifyUseOfFallbackBackendLists( 1580 inOrder, Arrays.asList(backendList.get(0), backendList.get(1))); 1581 1582 // When in fallback mode, fallback timer should not be scheduled when all backend 1583 // connections are lost 1584 for (Subchannel subchannel : subchannels) { 1585 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); 1586 inOrder.verify(helper).refreshNameResolution(); 1587 } 1588 1589 // Exit fallback mode or cancel fallback timer when receiving a new server list from balancer 1590 List<ServerEntry> serverList2 = Arrays.asList( 1591 new ServerEntry("127.0.0.1", 2001, "token0003"), 1592 new ServerEntry("127.0.0.1", 2011, "token0004")); 1593 lbResponseObserver.onNext(buildInitialResponse()); 1594 lbResponseObserver.onNext(buildLbResponse(serverList2)); 1595 1596 fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList2); 1597 } 1598 assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1599 1600 // No subchannel to fallback backends should have been created if no fallback happened 1601 if (!(balancerBroken && allSubchannelsBroken)) { 1602 verify(subchannelPool, never()).takeOrCreateSubchannel( 1603 eq(backendList.get(0)), any(Attributes.class)); 1604 verify(subchannelPool, never()).takeOrCreateSubchannel( 1605 eq(backendList.get(1)), any(Attributes.class)); 1606 } 1607 } 1608 1609 @Test grpclbFallback_allLost_failToFallback()1610 public void grpclbFallback_allLost_failToFallback() { 1611 long loadReportIntervalMillis = 1983; 1612 InOrder inOrder = inOrder(helper, mockLbService, subchannelPool); 1613 1614 // Create balancer and (empty) backend addresses 1615 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 1616 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList); 1617 1618 inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), 1619 eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); 1620 1621 // Attempted to connect to balancer 1622 assertEquals(1, fakeOobChannels.size()); 1623 fakeOobChannels.poll(); 1624 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1625 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1626 assertEquals(1, lbRequestObservers.size()); 1627 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 1628 1629 verify(lbRequestObserver).onNext( 1630 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1631 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1632 .build())); 1633 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 1634 // We don't care if these methods have been run. 1635 inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); 1636 inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); 1637 1638 inOrder.verifyNoMoreInteractions(); 1639 1640 // Balancer returns a server list 1641 List<ServerEntry> serverList = Arrays.asList( 1642 new ServerEntry("127.0.0.1", 2000, "token0001"), 1643 new ServerEntry("127.0.0.1", 2010, "token0002")); 1644 lbResponseObserver.onNext(buildInitialResponse()); 1645 lbResponseObserver.onNext(buildLbResponse(serverList)); 1646 1647 List<Subchannel> subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList); 1648 1649 // Break connections 1650 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 1651 // A new stream to LB is created 1652 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1653 assertEquals(1, lbRequestObservers.size()); 1654 1655 // Break all subchannel connections 1656 Status error = Status.UNAUTHENTICATED.withDescription("Permission denied"); 1657 for (Subchannel subchannel : subchannels) { 1658 deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); 1659 } 1660 1661 // Recycle all subchannels 1662 for (Subchannel subchannel : subchannels) { 1663 verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); 1664 } 1665 1666 // RPC error status includes errors of subchannels to balancer-provided backends 1667 inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 1668 PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); 1669 assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); 1670 assertThat(result.getStatus().getDescription()) 1671 .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); 1672 assertThat(result.getStatus().getDescription()).contains(error.getDescription()); 1673 } 1674 fallbackTestVerifyUseOfFallbackBackendLists( InOrder inOrder, List<EquivalentAddressGroup> addrs)1675 private List<Subchannel> fallbackTestVerifyUseOfFallbackBackendLists( 1676 InOrder inOrder, List<EquivalentAddressGroup> addrs) { 1677 return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, null); 1678 } 1679 fallbackTestVerifyUseOfBalancerBackendLists( InOrder inOrder, List<ServerEntry> servers)1680 private List<Subchannel> fallbackTestVerifyUseOfBalancerBackendLists( 1681 InOrder inOrder, List<ServerEntry> servers) { 1682 ArrayList<EquivalentAddressGroup> addrs = new ArrayList<>(); 1683 ArrayList<String> tokens = new ArrayList<>(); 1684 for (ServerEntry server : servers) { 1685 addrs.add(new EquivalentAddressGroup(server.addr, LB_BACKEND_ATTRS)); 1686 tokens.add(server.token); 1687 } 1688 return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, tokens); 1689 } 1690 fallbackTestVerifyUseOfBackendLists( InOrder inOrder, List<EquivalentAddressGroup> addrs, @Nullable List<String> tokens)1691 private List<Subchannel> fallbackTestVerifyUseOfBackendLists( 1692 InOrder inOrder, List<EquivalentAddressGroup> addrs, 1693 @Nullable List<String> tokens) { 1694 if (tokens != null) { 1695 assertEquals(addrs.size(), tokens.size()); 1696 } 1697 for (EquivalentAddressGroup addr : addrs) { 1698 inOrder.verify(subchannelPool).takeOrCreateSubchannel(eq(addr), any(Attributes.class)); 1699 } 1700 RoundRobinPicker picker = (RoundRobinPicker) currentPicker; 1701 assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); 1702 assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY); 1703 assertEquals(addrs.size(), mockSubchannels.size()); 1704 ArrayList<Subchannel> subchannels = new ArrayList<>(mockSubchannels); 1705 mockSubchannels.clear(); 1706 for (Subchannel subchannel : subchannels) { 1707 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); 1708 } 1709 inOrder.verify(helper, atLeast(0)) 1710 .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); 1711 inOrder.verify(helper, never()) 1712 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1713 1714 ArrayList<BackendEntry> pickList = new ArrayList<>(); 1715 for (int i = 0; i < addrs.size(); i++) { 1716 Subchannel subchannel = subchannels.get(i); 1717 BackendEntry backend; 1718 if (tokens == null) { 1719 backend = new BackendEntry(subchannel); 1720 } else { 1721 backend = new BackendEntry(subchannel, getLoadRecorder(), tokens.get(i)); 1722 } 1723 pickList.add(backend); 1724 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); 1725 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 1726 picker = (RoundRobinPicker) pickerCaptor.getValue(); 1727 assertThat(picker.dropList) 1728 .containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); 1729 assertThat(picker.pickList).containsExactlyElementsIn(pickList); 1730 inOrder.verify(helper, never()) 1731 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1732 } 1733 return subchannels; 1734 } 1735 1736 @Test grpclbMultipleAuthorities()1737 public void grpclbMultipleAuthorities() throws Exception { 1738 List<EquivalentAddressGroup> backendList = Collections.singletonList( 1739 new EquivalentAddressGroup(new FakeSocketAddress("not-a-lb-address"))); 1740 List<EquivalentAddressGroup> grpclbBalancerList = Arrays.asList( 1741 new EquivalentAddressGroup( 1742 new FakeSocketAddress("fake-address-1"), 1743 lbAttributes("fake-authority-1")), 1744 new EquivalentAddressGroup( 1745 new FakeSocketAddress("fake-address-2"), 1746 lbAttributes("fake-authority-2")), 1747 new EquivalentAddressGroup( 1748 new FakeSocketAddress("fake-address-3"), 1749 lbAttributes("fake-authority-1").toBuilder() 1750 .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, "value").build() 1751 )); 1752 deliverResolvedAddresses(backendList, grpclbBalancerList); 1753 1754 List<EquivalentAddressGroup> goldenOobEagList = 1755 Arrays.asList( 1756 new EquivalentAddressGroup( 1757 new FakeSocketAddress("fake-address-1"), 1758 Attributes.newBuilder() 1759 .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-1") 1760 .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-1") 1761 .build()), 1762 new EquivalentAddressGroup( 1763 new FakeSocketAddress("fake-address-2"), 1764 Attributes.newBuilder() 1765 .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-2") 1766 .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-2") 1767 .build()), 1768 new EquivalentAddressGroup( 1769 new FakeSocketAddress("fake-address-3"), 1770 Attributes.newBuilder() 1771 .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-1") 1772 .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, "value") 1773 .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-1") 1774 .build() 1775 )); 1776 1777 verify(helper).createOobChannel(eq(goldenOobEagList), 1778 eq("fake-authority-1" + NO_USE_AUTHORITY_SUFFIX)); 1779 } 1780 1781 @Test grpclbBalancerStreamClosedAndRetried()1782 public void grpclbBalancerStreamClosedAndRetried() throws Exception { 1783 LoadBalanceRequest expectedInitialRequest = 1784 LoadBalanceRequest.newBuilder() 1785 .setInitialRequest( 1786 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1787 .build(); 1788 InOrder inOrder = 1789 inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2, helper); 1790 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 1791 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList); 1792 1793 assertEquals(1, fakeOobChannels.size()); 1794 @SuppressWarnings("unused") 1795 ManagedChannel oobChannel = fakeOobChannels.poll(); 1796 1797 // First balancer RPC 1798 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1799 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1800 assertEquals(1, lbRequestObservers.size()); 1801 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 1802 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1803 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1804 1805 // Balancer closes it immediately (erroneously) 1806 lbResponseObserver.onCompleted(); 1807 1808 // Will start backoff sequence 1 (10ns) 1809 inOrder.verify(backoffPolicyProvider).get(); 1810 inOrder.verify(backoffPolicy1).nextBackoffNanos(); 1811 assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1812 inOrder.verify(helper).refreshNameResolution(); 1813 1814 // Fast-forward to a moment before the retry 1815 fakeClock.forwardNanos(9); 1816 verifyNoMoreInteractions(mockLbService); 1817 // Then time for retry 1818 fakeClock.forwardNanos(1); 1819 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1820 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1821 assertEquals(1, lbRequestObservers.size()); 1822 lbRequestObserver = lbRequestObservers.poll(); 1823 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1824 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1825 1826 // Balancer closes it with an error. 1827 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 1828 // Will continue the backoff sequence 1 (100ns) 1829 verifyNoMoreInteractions(backoffPolicyProvider); 1830 inOrder.verify(backoffPolicy1).nextBackoffNanos(); 1831 assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1832 inOrder.verify(helper).refreshNameResolution(); 1833 1834 // Fast-forward to a moment before the retry 1835 fakeClock.forwardNanos(100 - 1); 1836 verifyNoMoreInteractions(mockLbService); 1837 // Then time for retry 1838 fakeClock.forwardNanos(1); 1839 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1840 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1841 assertEquals(1, lbRequestObservers.size()); 1842 lbRequestObserver = lbRequestObservers.poll(); 1843 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1844 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1845 1846 // Balancer sends initial response. 1847 lbResponseObserver.onNext(buildInitialResponse()); 1848 1849 // Then breaks the RPC 1850 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 1851 1852 // Will reset the retry sequence and retry immediately, because balancer has responded. 1853 inOrder.verify(backoffPolicyProvider).get(); 1854 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1855 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1856 assertEquals(1, lbRequestObservers.size()); 1857 lbRequestObserver = lbRequestObservers.poll(); 1858 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1859 inOrder.verify(helper).refreshNameResolution(); 1860 1861 // Fail the retry after spending 4ns 1862 fakeClock.forwardNanos(4); 1863 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 1864 1865 // Will be on the first retry (10ns) of backoff sequence 2. 1866 inOrder.verify(backoffPolicy2).nextBackoffNanos(); 1867 assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1868 inOrder.verify(helper).refreshNameResolution(); 1869 1870 // Fast-forward to a moment before the retry, the time spent in the last try is deducted. 1871 fakeClock.forwardNanos(10 - 4 - 1); 1872 verifyNoMoreInteractions(mockLbService); 1873 // Then time for retry 1874 fakeClock.forwardNanos(1); 1875 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1876 assertEquals(1, lbRequestObservers.size()); 1877 lbRequestObserver = lbRequestObservers.poll(); 1878 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1879 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1880 1881 // Wrapping up 1882 verify(backoffPolicyProvider, times(2)).get(); 1883 verify(backoffPolicy1, times(2)).nextBackoffNanos(); 1884 verify(backoffPolicy2, times(1)).nextBackoffNanos(); 1885 verify(helper, times(4)).refreshNameResolution(); 1886 } 1887 1888 @Test grpclbWorking_pickFirstMode()1889 public void grpclbWorking_pickFirstMode() throws Exception { 1890 InOrder inOrder = inOrder(helper); 1891 1892 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 1893 1894 deliverResolvedAddresses( 1895 Collections.<EquivalentAddressGroup>emptyList(), 1896 grpclbBalancerList, 1897 GrpclbConfig.create(Mode.PICK_FIRST)); 1898 1899 assertEquals(1, fakeOobChannels.size()); 1900 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1901 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1902 assertEquals(1, lbRequestObservers.size()); 1903 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 1904 verify(lbRequestObserver).onNext( 1905 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1906 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1907 .build())); 1908 1909 // Simulate receiving LB response 1910 List<ServerEntry> backends1 = Arrays.asList( 1911 new ServerEntry("127.0.0.1", 2000, "token0001"), 1912 new ServerEntry("127.0.0.1", 2010, "token0002")); 1913 inOrder.verify(helper, never()) 1914 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1915 lbResponseObserver.onNext(buildInitialResponse()); 1916 lbResponseObserver.onNext(buildLbResponse(backends1)); 1917 1918 inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); 1919 CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); 1920 assertThat(createSubchannelArgs.getAddresses()) 1921 .containsExactly( 1922 new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), 1923 new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); 1924 1925 // Initially IDLE 1926 inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); 1927 RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); 1928 1929 // Only one subchannel is created 1930 assertThat(mockSubchannels).hasSize(1); 1931 Subchannel subchannel = mockSubchannels.poll(); 1932 assertThat(picker0.dropList).containsExactly(null, null); 1933 assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); 1934 1935 // PICK_FIRST doesn't eagerly connect 1936 verify(subchannel, never()).requestConnection(); 1937 1938 // CONNECTING 1939 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); 1940 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 1941 RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); 1942 assertThat(picker1.dropList).containsExactly(null, null); 1943 assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY); 1944 1945 // TRANSIENT_FAILURE 1946 Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); 1947 deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); 1948 inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 1949 RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); 1950 assertThat(picker2.dropList).containsExactly(null, null); 1951 assertThat(picker2.pickList).containsExactly(new ErrorEntry(error)); 1952 1953 // READY 1954 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); 1955 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 1956 RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); 1957 assertThat(picker3.dropList).containsExactly(null, null); 1958 assertThat(picker3.pickList).containsExactly( 1959 new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); 1960 1961 1962 // New server list with drops 1963 List<ServerEntry> backends2 = Arrays.asList( 1964 new ServerEntry("127.0.0.1", 2000, "token0001"), 1965 new ServerEntry("token0003"), // drop 1966 new ServerEntry("127.0.0.1", 2020, "token0004")); 1967 inOrder.verify(helper, never()) 1968 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1969 lbResponseObserver.onNext(buildLbResponse(backends2)); 1970 1971 // new addresses will be updated to the existing subchannel 1972 // createSubchannel() has ever been called only once 1973 verify(helper, times(1)).createSubchannel(any(CreateSubchannelArgs.class)); 1974 assertThat(mockSubchannels).isEmpty(); 1975 verify(subchannel).updateAddresses( 1976 eq(Arrays.asList( 1977 new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")), 1978 new EquivalentAddressGroup(backends2.get(2).addr, 1979 eagAttrsWithToken("token0004"))))); 1980 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 1981 RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); 1982 assertThat(picker4.dropList).containsExactly( 1983 null, new DropEntry(getLoadRecorder(), "token0003"), null); 1984 assertThat(picker4.pickList).containsExactly( 1985 new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); 1986 1987 // Subchannel goes IDLE, but PICK_FIRST will not try to reconnect 1988 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); 1989 inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); 1990 RoundRobinPicker picker5 = (RoundRobinPicker) pickerCaptor.getValue(); 1991 verify(subchannel, never()).requestConnection(); 1992 1993 // ... until it's selected 1994 PickSubchannelArgs args = mock(PickSubchannelArgs.class); 1995 PickResult pick = picker5.pickSubchannel(args); 1996 assertThat(pick).isSameInstanceAs(PickResult.withNoResult()); 1997 verify(subchannel).requestConnection(); 1998 1999 // ... or requested by application 2000 balancer.requestConnection(); 2001 verify(subchannel, times(2)).requestConnection(); 2002 2003 // PICK_FIRST doesn't use subchannelPool 2004 verify(subchannelPool, never()) 2005 .takeOrCreateSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); 2006 verify(subchannelPool, never()) 2007 .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); 2008 } 2009 2010 @Test grpclbWorking_pickFirstMode_lbSendsEmptyAddress()2011 public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { 2012 InOrder inOrder = inOrder(helper); 2013 2014 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 2015 deliverResolvedAddresses( 2016 Collections.<EquivalentAddressGroup>emptyList(), 2017 grpclbBalancerList, 2018 GrpclbConfig.create(Mode.PICK_FIRST)); 2019 2020 assertEquals(1, fakeOobChannels.size()); 2021 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 2022 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 2023 assertEquals(1, lbRequestObservers.size()); 2024 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 2025 verify(lbRequestObserver).onNext( 2026 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 2027 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 2028 .build())); 2029 2030 // Simulate receiving LB response 2031 List<ServerEntry> backends1 = Arrays.asList( 2032 new ServerEntry("127.0.0.1", 2000, "token0001"), 2033 new ServerEntry("127.0.0.1", 2010, "token0002")); 2034 inOrder.verify(helper, never()) 2035 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 2036 lbResponseObserver.onNext(buildInitialResponse()); 2037 lbResponseObserver.onNext(buildLbResponse(backends1)); 2038 2039 inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); 2040 CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); 2041 assertThat(createSubchannelArgs.getAddresses()) 2042 .containsExactly( 2043 new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), 2044 new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); 2045 2046 // Initially IDLE 2047 inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); 2048 RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); 2049 2050 // Only one subchannel is created 2051 assertThat(mockSubchannels).hasSize(1); 2052 Subchannel subchannel = mockSubchannels.poll(); 2053 assertThat(picker0.dropList).containsExactly(null, null); 2054 assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); 2055 2056 // PICK_FIRST doesn't eagerly connect 2057 verify(subchannel, never()).requestConnection(); 2058 2059 // CONNECTING 2060 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); 2061 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 2062 RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); 2063 assertThat(picker1.dropList).containsExactly(null, null); 2064 assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY); 2065 2066 // TRANSIENT_FAILURE 2067 Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); 2068 deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); 2069 inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 2070 RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); 2071 assertThat(picker2.dropList).containsExactly(null, null); 2072 assertThat(picker2.pickList).containsExactly(new ErrorEntry(error)); 2073 2074 // READY 2075 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); 2076 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 2077 RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); 2078 assertThat(picker3.dropList).containsExactly(null, null); 2079 assertThat(picker3.pickList).containsExactly( 2080 new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); 2081 2082 inOrder.verify(helper, never()) 2083 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 2084 2085 // Empty addresses from LB 2086 lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList())); 2087 2088 // new addresses will be updated to the existing subchannel 2089 // createSubchannel() has ever been called only once 2090 inOrder.verify(helper, never()).createSubchannel(any(CreateSubchannelArgs.class)); 2091 assertThat(mockSubchannels).isEmpty(); 2092 verify(subchannel).shutdown(); 2093 2094 // RPC error status includes message of no backends provided by balancer 2095 inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 2096 RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue(); 2097 assertThat(errorPicker.pickList) 2098 .containsExactly(new ErrorEntry(GrpclbState.NO_AVAILABLE_BACKENDS_STATUS)); 2099 2100 lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList())); 2101 2102 // Test recover from new LB response with addresses 2103 // New server list with drops 2104 List<ServerEntry> backends2 = Arrays.asList( 2105 new ServerEntry("127.0.0.1", 2000, "token0001"), 2106 new ServerEntry("token0003"), // drop 2107 new ServerEntry("127.0.0.1", 2020, "token0004")); 2108 inOrder.verify(helper, never()) 2109 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 2110 lbResponseObserver.onNext(buildLbResponse(backends2)); 2111 2112 // new addresses will be updated to the existing subchannel 2113 inOrder.verify(helper, times(1)).createSubchannel(any(CreateSubchannelArgs.class)); 2114 inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); 2115 subchannel = mockSubchannels.poll(); 2116 2117 // Subchannel became READY 2118 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); 2119 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); 2120 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 2121 RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); 2122 assertThat(picker4.pickList).containsExactly( 2123 new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); 2124 } 2125 2126 @Test shutdownWithoutSubchannel_roundRobin()2127 public void shutdownWithoutSubchannel_roundRobin() throws Exception { 2128 subtestShutdownWithoutSubchannel(GrpclbConfig.create(Mode.ROUND_ROBIN)); 2129 } 2130 2131 @Test shutdownWithoutSubchannel_pickFirst()2132 public void shutdownWithoutSubchannel_pickFirst() throws Exception { 2133 subtestShutdownWithoutSubchannel(GrpclbConfig.create(Mode.PICK_FIRST)); 2134 } 2135 subtestShutdownWithoutSubchannel(GrpclbConfig grpclbConfig)2136 private void subtestShutdownWithoutSubchannel(GrpclbConfig grpclbConfig) { 2137 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 2138 deliverResolvedAddresses( 2139 Collections.<EquivalentAddressGroup>emptyList(), 2140 grpclbBalancerList, 2141 grpclbConfig); 2142 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 2143 assertEquals(1, lbRequestObservers.size()); 2144 StreamObserver<LoadBalanceRequest> requestObserver = lbRequestObservers.poll(); 2145 2146 verify(requestObserver, never()).onCompleted(); 2147 balancer.shutdown(); 2148 ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class); 2149 verify(requestObserver).onError(throwableCaptor.capture()); 2150 assertThat(Status.fromThrowable(throwableCaptor.getValue()).getCode()) 2151 .isEqualTo(Code.CANCELLED); 2152 } 2153 2154 @Test pickFirstMode_defaultTimeout_fallback()2155 public void pickFirstMode_defaultTimeout_fallback() throws Exception { 2156 pickFirstModeFallback(GrpclbState.FALLBACK_TIMEOUT_MS); 2157 } 2158 2159 @Test pickFirstMode_serviceConfigTimeout_fallback()2160 public void pickFirstMode_serviceConfigTimeout_fallback() throws Exception { 2161 pickFirstModeFallback(12345); 2162 } 2163 pickFirstModeFallback(long timeout)2164 private void pickFirstModeFallback(long timeout) throws Exception { 2165 InOrder inOrder = inOrder(helper); 2166 2167 // Name resolver returns balancer and backend addresses 2168 List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2); 2169 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 2170 deliverResolvedAddresses( 2171 backendList, grpclbBalancerList, GrpclbConfig.create(Mode.PICK_FIRST, null, timeout)); 2172 2173 // Attempted to connect to balancer 2174 assertEquals(1, fakeOobChannels.size()); 2175 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 2176 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 2177 assertEquals(1, lbRequestObservers.size()); 2178 2179 // Fallback timer expires with no response 2180 fakeClock.forwardTime(timeout, TimeUnit.MILLISECONDS); 2181 2182 // Entering fallback mode 2183 inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); 2184 CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); 2185 assertThat(createSubchannelArgs.getAddresses()) 2186 .containsExactly(backendList.get(0), backendList.get(1)); 2187 2188 assertThat(mockSubchannels).hasSize(1); 2189 Subchannel subchannel = mockSubchannels.poll(); 2190 2191 // Initially IDLE 2192 inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); 2193 RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); 2194 2195 // READY 2196 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); 2197 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 2198 RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); 2199 assertThat(picker1.dropList).containsExactly(null, null); 2200 assertThat(picker1.pickList).containsExactly( 2201 new BackendEntry(subchannel, new TokenAttachingTracerFactory(null))); 2202 2203 assertThat(picker0.dropList).containsExactly(null, null); 2204 assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); 2205 2206 2207 // Finally, an LB response, which brings us out of fallback 2208 List<ServerEntry> backends1 = Arrays.asList( 2209 new ServerEntry("127.0.0.1", 2000, "token0001"), 2210 new ServerEntry("127.0.0.1", 2010, "token0002")); 2211 inOrder.verify(helper, never()) 2212 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 2213 lbResponseObserver.onNext(buildInitialResponse()); 2214 lbResponseObserver.onNext(buildLbResponse(backends1)); 2215 2216 // new addresses will be updated to the existing subchannel 2217 // createSubchannel() has ever been called only once 2218 inOrder.verify(helper, never()).createSubchannel(any(CreateSubchannelArgs.class)); 2219 assertThat(mockSubchannels).isEmpty(); 2220 verify(subchannel).updateAddresses( 2221 eq(Arrays.asList( 2222 new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), 2223 new EquivalentAddressGroup(backends1.get(1).addr, 2224 eagAttrsWithToken("token0002"))))); 2225 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 2226 RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); 2227 assertThat(picker2.dropList).containsExactly(null, null); 2228 assertThat(picker2.pickList).containsExactly( 2229 new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); 2230 2231 // PICK_FIRST doesn't use subchannelPool 2232 verify(subchannelPool, never()) 2233 .takeOrCreateSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); 2234 verify(subchannelPool, never()) 2235 .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); 2236 } 2237 2238 @Test switchMode()2239 public void switchMode() throws Exception { 2240 InOrder inOrder = inOrder(helper); 2241 2242 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 2243 deliverResolvedAddresses( 2244 Collections.<EquivalentAddressGroup>emptyList(), 2245 grpclbBalancerList, 2246 GrpclbConfig.create(Mode.ROUND_ROBIN)); 2247 2248 assertEquals(1, fakeOobChannels.size()); 2249 ManagedChannel oobChannel = fakeOobChannels.poll(); 2250 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 2251 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 2252 assertEquals(1, lbRequestObservers.size()); 2253 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 2254 verify(lbRequestObserver).onNext( 2255 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 2256 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 2257 .build())); 2258 2259 // Simulate receiving LB response 2260 List<ServerEntry> backends1 = Arrays.asList( 2261 new ServerEntry("127.0.0.1", 2000, "token0001"), 2262 new ServerEntry("127.0.0.1", 2010, "token0002")); 2263 inOrder.verify(helper, never()) 2264 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 2265 lbResponseObserver.onNext(buildInitialResponse()); 2266 lbResponseObserver.onNext(buildLbResponse(backends1)); 2267 2268 // ROUND_ROBIN: create one subchannel per server 2269 verify(subchannelPool).takeOrCreateSubchannel( 2270 eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), 2271 any(Attributes.class)); 2272 verify(subchannelPool).takeOrCreateSubchannel( 2273 eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), 2274 any(Attributes.class)); 2275 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); 2276 assertEquals(2, mockSubchannels.size()); 2277 Subchannel subchannel1 = mockSubchannels.poll(); 2278 Subchannel subchannel2 = mockSubchannels.poll(); 2279 verify(subchannelPool, never()) 2280 .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); 2281 2282 // Switch to PICK_FIRST 2283 deliverResolvedAddresses( 2284 Collections.<EquivalentAddressGroup>emptyList(), 2285 grpclbBalancerList, GrpclbConfig.create(Mode.PICK_FIRST)); 2286 2287 2288 // GrpclbState will be shutdown, and a new one will be created 2289 assertThat(oobChannel.isShutdown()).isTrue(); 2290 verify(subchannelPool) 2291 .returnSubchannel(same(subchannel1), eq(ConnectivityStateInfo.forNonError(IDLE))); 2292 verify(subchannelPool) 2293 .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE))); 2294 2295 // A new LB stream is created 2296 assertEquals(1, fakeOobChannels.size()); 2297 verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); 2298 lbResponseObserver = lbResponseObserverCaptor.getValue(); 2299 assertEquals(1, lbRequestObservers.size()); 2300 lbRequestObserver = lbRequestObservers.poll(); 2301 verify(lbRequestObserver).onNext( 2302 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 2303 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 2304 .build())); 2305 2306 // Simulate receiving LB response 2307 inOrder.verify(helper, never()) 2308 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 2309 lbResponseObserver.onNext(buildInitialResponse()); 2310 lbResponseObserver.onNext(buildLbResponse(backends1)); 2311 2312 // PICK_FIRST Subchannel 2313 inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); 2314 CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); 2315 assertThat(createSubchannelArgs.getAddresses()) 2316 .containsExactly( 2317 new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), 2318 new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); 2319 2320 inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); 2321 } 2322 eagAttrsWithToken(String token)2323 private static Attributes eagAttrsWithToken(String token) { 2324 return LB_BACKEND_ATTRS.toBuilder().set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, token).build(); 2325 } 2326 2327 @Test switchMode_nullLbPolicy()2328 public void switchMode_nullLbPolicy() throws Exception { 2329 InOrder inOrder = inOrder(helper); 2330 2331 final List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 2332 deliverResolvedAddresses( 2333 Collections.<EquivalentAddressGroup>emptyList(), 2334 grpclbBalancerList); 2335 2336 assertEquals(1, fakeOobChannels.size()); 2337 ManagedChannel oobChannel = fakeOobChannels.poll(); 2338 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 2339 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 2340 assertEquals(1, lbRequestObservers.size()); 2341 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 2342 verify(lbRequestObserver).onNext( 2343 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 2344 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 2345 .build())); 2346 2347 // Simulate receiving LB response 2348 List<ServerEntry> backends1 = Arrays.asList( 2349 new ServerEntry("127.0.0.1", 2000, "token0001"), 2350 new ServerEntry("127.0.0.1", 2010, "token0002")); 2351 inOrder.verify(helper, never()) 2352 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 2353 lbResponseObserver.onNext(buildInitialResponse()); 2354 lbResponseObserver.onNext(buildLbResponse(backends1)); 2355 2356 // ROUND_ROBIN: create one subchannel per server 2357 verify(subchannelPool).takeOrCreateSubchannel( 2358 eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), 2359 any(Attributes.class)); 2360 verify(subchannelPool).takeOrCreateSubchannel( 2361 eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), 2362 any(Attributes.class)); 2363 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); 2364 assertEquals(2, mockSubchannels.size()); 2365 Subchannel subchannel1 = mockSubchannels.poll(); 2366 Subchannel subchannel2 = mockSubchannels.poll(); 2367 verify(subchannelPool, never()) 2368 .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); 2369 2370 // Switch to PICK_FIRST 2371 deliverResolvedAddresses( 2372 Collections.<EquivalentAddressGroup>emptyList(), 2373 grpclbBalancerList, 2374 GrpclbConfig.create(Mode.PICK_FIRST)); 2375 2376 // GrpclbState will be shutdown, and a new one will be created 2377 assertThat(oobChannel.isShutdown()).isTrue(); 2378 verify(subchannelPool) 2379 .returnSubchannel(same(subchannel1), eq(ConnectivityStateInfo.forNonError(IDLE))); 2380 verify(subchannelPool) 2381 .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE))); 2382 2383 // A new LB stream is created 2384 assertEquals(1, fakeOobChannels.size()); 2385 verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); 2386 lbResponseObserver = lbResponseObserverCaptor.getValue(); 2387 assertEquals(1, lbRequestObservers.size()); 2388 lbRequestObserver = lbRequestObservers.poll(); 2389 verify(lbRequestObserver).onNext( 2390 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 2391 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 2392 .build())); 2393 2394 // Simulate receiving LB response 2395 inOrder.verify(helper, never()) 2396 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 2397 lbResponseObserver.onNext(buildInitialResponse()); 2398 lbResponseObserver.onNext(buildLbResponse(backends1)); 2399 2400 // PICK_FIRST Subchannel 2401 inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); 2402 CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); 2403 assertThat(createSubchannelArgs.getAddresses()) 2404 .containsExactly( 2405 new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), 2406 new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); 2407 2408 inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); 2409 } 2410 2411 @Test switchServiceName()2412 public void switchServiceName() throws Exception { 2413 InOrder inOrder = inOrder(helper); 2414 2415 String serviceName = "foo.google.com"; 2416 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1); 2417 2418 deliverResolvedAddresses( 2419 Collections.<EquivalentAddressGroup>emptyList(), 2420 grpclbBalancerList, 2421 GrpclbConfig.create(Mode.ROUND_ROBIN, serviceName, GrpclbState.FALLBACK_TIMEOUT_MS)); 2422 2423 assertEquals(1, fakeOobChannels.size()); 2424 ManagedChannel oobChannel = fakeOobChannels.poll(); 2425 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 2426 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 2427 assertEquals(1, lbRequestObservers.size()); 2428 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 2429 verify(lbRequestObserver).onNext( 2430 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 2431 InitialLoadBalanceRequest.newBuilder().setName(serviceName).build()) 2432 .build())); 2433 2434 // Simulate receiving LB response 2435 List<ServerEntry> backends1 = Arrays.asList( 2436 new ServerEntry("127.0.0.1", 2000, "token0001"), 2437 new ServerEntry("127.0.0.1", 2010, "token0002")); 2438 inOrder.verify(helper, never()) 2439 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 2440 lbResponseObserver.onNext(buildInitialResponse()); 2441 lbResponseObserver.onNext(buildLbResponse(backends1)); 2442 2443 // ROUND_ROBIN: create one subchannel per server 2444 verify(subchannelPool).takeOrCreateSubchannel( 2445 eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), 2446 any(Attributes.class)); 2447 verify(subchannelPool).takeOrCreateSubchannel( 2448 eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), 2449 any(Attributes.class)); 2450 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); 2451 assertEquals(2, mockSubchannels.size()); 2452 Subchannel subchannel1 = mockSubchannels.poll(); 2453 Subchannel subchannel2 = mockSubchannels.poll(); 2454 verify(subchannelPool, never()) 2455 .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); 2456 2457 // Switch to different serviceName 2458 serviceName = "bar.google.com"; 2459 List<EquivalentAddressGroup> newGrpclbResolutionList = createResolvedBalancerAddresses(1); 2460 deliverResolvedAddresses( 2461 Collections.<EquivalentAddressGroup>emptyList(), 2462 newGrpclbResolutionList, 2463 GrpclbConfig.create(Mode.ROUND_ROBIN, serviceName, GrpclbState.FALLBACK_TIMEOUT_MS)); 2464 2465 // GrpclbState will be shutdown, and a new one will be created 2466 assertThat(oobChannel.isShutdown()).isTrue(); 2467 verify(subchannelPool) 2468 .returnSubchannel(same(subchannel1), eq(ConnectivityStateInfo.forNonError(IDLE))); 2469 verify(subchannelPool) 2470 .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE))); 2471 2472 assertEquals(1, fakeOobChannels.size()); 2473 verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); 2474 assertEquals(1, lbRequestObservers.size()); 2475 lbRequestObserver = lbRequestObservers.poll(); 2476 verify(lbRequestObserver).onNext( 2477 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 2478 InitialLoadBalanceRequest.newBuilder().setName(serviceName).build()) 2479 .build())); 2480 } 2481 2482 @Test grpclbWorking_lbSendsFallbackMessage()2483 public void grpclbWorking_lbSendsFallbackMessage() { 2484 InOrder inOrder = inOrder(helper, subchannelPool); 2485 List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2); 2486 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(2); 2487 deliverResolvedAddresses(backendList, grpclbBalancerList); 2488 2489 // Fallback timer is started as soon as the addresses are resolved. 2490 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 2491 verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), 2492 eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); 2493 assertEquals(1, fakeOobChannels.size()); 2494 ManagedChannel oobChannel = fakeOobChannels.poll(); 2495 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 2496 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 2497 assertEquals(1, lbRequestObservers.size()); 2498 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 2499 verify(lbRequestObserver).onNext( 2500 eq(LoadBalanceRequest.newBuilder() 2501 .setInitialRequest( 2502 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 2503 .build())); 2504 2505 // Simulate receiving LB response 2506 ServerEntry backend1a = new ServerEntry("127.0.0.1", 2000, "token0001"); 2507 ServerEntry backend1b = new ServerEntry("127.0.0.1", 2010, "token0002"); 2508 List<ServerEntry> backends1 = Arrays.asList(backend1a, backend1b); 2509 inOrder.verify(helper, never()) 2510 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 2511 logs.clear(); 2512 lbResponseObserver.onNext(buildInitialResponse()); 2513 assertThat(logs).containsExactly( 2514 "INFO: [grpclb-<api.google.com>] Got an LB initial response: " + buildInitialResponse()); 2515 logs.clear(); 2516 lbResponseObserver.onNext(buildLbResponse(backends1)); 2517 2518 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 2519 eq(new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS)), 2520 any(Attributes.class)); 2521 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 2522 eq(new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS)), 2523 any(Attributes.class)); 2524 2525 assertEquals(2, mockSubchannels.size()); 2526 Subchannel subchannel1 = mockSubchannels.poll(); 2527 Subchannel subchannel2 = mockSubchannels.poll(); 2528 2529 verify(subchannel1).requestConnection(); 2530 verify(subchannel2).requestConnection(); 2531 assertEquals( 2532 new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS), 2533 subchannel1.getAddresses()); 2534 assertEquals( 2535 new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS), 2536 subchannel2.getAddresses()); 2537 2538 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); 2539 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); 2540 2541 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 2542 RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); 2543 assertThat(picker0.dropList).containsExactly(null, null); 2544 assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY); 2545 inOrder.verifyNoMoreInteractions(); 2546 2547 assertThat(logs) 2548 .containsExactly( 2549 "DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildLbResponse(backends1)) 2550 .inOrder(); 2551 logs.clear(); 2552 2553 // Let subchannels be connected 2554 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); 2555 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 2556 2557 RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); 2558 2559 assertThat(picker1.dropList).containsExactly(null, null); 2560 assertThat(picker1.pickList).containsExactly( 2561 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); 2562 2563 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); 2564 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 2565 2566 RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); 2567 assertThat(picker2.dropList).containsExactly(null, null); 2568 assertThat(picker2.pickList).containsExactly( 2569 new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), 2570 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) 2571 .inOrder(); 2572 2573 // Balancer forces entering fallback mode 2574 lbResponseObserver.onNext(buildLbFallbackResponse()); 2575 2576 // existing subchannels must be returned immediately to gracefully shutdown. 2577 verify(subchannelPool) 2578 .returnSubchannel(eq(subchannel1), eq(ConnectivityStateInfo.forNonError(READY))); 2579 verify(subchannelPool) 2580 .returnSubchannel(eq(subchannel2), eq(ConnectivityStateInfo.forNonError(READY))); 2581 2582 // verify fallback 2583 fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList); 2584 2585 assertFalse(oobChannel.isShutdown()); 2586 verify(lbRequestObserver, never()).onCompleted(); 2587 2588 ////////////////////////////////////////////////////////////////////// 2589 // Name resolver sends new resolution results without any backend addr 2590 ////////////////////////////////////////////////////////////////////// 2591 deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList); 2592 2593 // Still in fallback logic, except that the backend list is empty 2594 for (Subchannel subchannel : mockSubchannels) { 2595 verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); 2596 } 2597 2598 // RPC error status includes message of fallback requested by balancer 2599 inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 2600 PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); 2601 assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); 2602 assertThat(result.getStatus().getDescription()) 2603 .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); 2604 assertThat(result.getStatus().getDescription()) 2605 .contains(GrpclbState.BALANCER_REQUESTED_FALLBACK_STATUS.getDescription()); 2606 2607 // exit fall back by providing two new backends 2608 ServerEntry backend2a = new ServerEntry("127.0.0.1", 8000, "token1001"); 2609 ServerEntry backend2b = new ServerEntry("127.0.0.1", 8010, "token1002"); 2610 List<ServerEntry> backends2 = Arrays.asList(backend2a, backend2b); 2611 2612 inOrder.verify(helper, never()) 2613 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 2614 logs.clear(); 2615 lbResponseObserver.onNext(buildLbResponse(backends2)); 2616 2617 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 2618 eq(new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS)), 2619 any(Attributes.class)); 2620 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 2621 eq(new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS)), 2622 any(Attributes.class)); 2623 2624 assertEquals(2, mockSubchannels.size()); 2625 Subchannel subchannel3 = mockSubchannels.poll(); 2626 Subchannel subchannel4 = mockSubchannels.poll(); 2627 verify(subchannel3).requestConnection(); 2628 verify(subchannel4).requestConnection(); 2629 assertEquals( 2630 new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS), 2631 subchannel3.getAddresses()); 2632 assertEquals( 2633 new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS), 2634 subchannel4.getAddresses()); 2635 2636 deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(CONNECTING)); 2637 deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(CONNECTING)); 2638 2639 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 2640 RoundRobinPicker picker6 = (RoundRobinPicker) pickerCaptor.getValue(); 2641 assertThat(picker6.dropList).containsExactly(null, null); 2642 assertThat(picker6.pickList).containsExactly(BUFFER_ENTRY); 2643 inOrder.verifyNoMoreInteractions(); 2644 2645 assertThat(logs) 2646 .containsExactly( 2647 "DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildLbResponse(backends2)) 2648 .inOrder(); 2649 logs.clear(); 2650 2651 // Let new subchannels be connected 2652 deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); 2653 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 2654 2655 RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); 2656 assertThat(picker3.dropList).containsExactly(null, null); 2657 assertThat(picker3.pickList).containsExactly( 2658 new BackendEntry(subchannel3, getLoadRecorder(), "token1001")); 2659 2660 deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(READY)); 2661 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 2662 2663 RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); 2664 assertThat(picker4.dropList).containsExactly(null, null); 2665 assertThat(picker4.pickList).containsExactly( 2666 new BackendEntry(subchannel3, getLoadRecorder(), "token1001"), 2667 new BackendEntry(subchannel4, getLoadRecorder(), "token1002")) 2668 .inOrder(); 2669 } 2670 2671 @Test useIndependentRpcContext()2672 public void useIndependentRpcContext() { 2673 // Simulates making RPCs within the context of an inbound RPC. 2674 CancellableContext cancellableContext = Context.current().withCancellation(); 2675 Context prevContext = cancellableContext.attach(); 2676 try { 2677 List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2); 2678 List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(2); 2679 deliverResolvedAddresses(backendList, grpclbBalancerList); 2680 2681 verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), 2682 eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); 2683 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 2684 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 2685 assertEquals(1, lbRequestObservers.size()); 2686 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 2687 verify(lbRequestObserver).onNext( 2688 eq(LoadBalanceRequest.newBuilder() 2689 .setInitialRequest( 2690 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 2691 .build())); 2692 lbResponseObserver.onNext(buildInitialResponse()); 2693 2694 // The inbound RPC finishes and closes its context. The outbound RPC's control plane RPC 2695 // should not be impacted (no retry). 2696 cancellableContext.close(); 2697 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 2698 verifyNoMoreInteractions(mockLbService); 2699 } finally { 2700 cancellableContext.detach(prevContext); 2701 } 2702 } 2703 deliverSubchannelState( final Subchannel subchannel, final ConnectivityStateInfo newState)2704 private void deliverSubchannelState( 2705 final Subchannel subchannel, final ConnectivityStateInfo newState) { 2706 ((FakeSubchannel) subchannel).updateState(newState); 2707 } 2708 deliverNameResolutionError(final Status error)2709 private void deliverNameResolutionError(final Status error) { 2710 syncContext.execute(new Runnable() { 2711 @Override 2712 public void run() { 2713 balancer.handleNameResolutionError(error); 2714 } 2715 }); 2716 } 2717 deliverResolvedAddresses( final List<EquivalentAddressGroup> backendAddrs, List<EquivalentAddressGroup> balancerAddrs)2718 private void deliverResolvedAddresses( 2719 final List<EquivalentAddressGroup> backendAddrs, 2720 List<EquivalentAddressGroup> balancerAddrs) { 2721 deliverResolvedAddresses(backendAddrs, balancerAddrs, GrpclbConfig.create(Mode.ROUND_ROBIN)); 2722 } 2723 deliverResolvedAddresses( final List<EquivalentAddressGroup> backendAddrs, List<EquivalentAddressGroup> balancerAddrs, final GrpclbConfig grpclbConfig)2724 private void deliverResolvedAddresses( 2725 final List<EquivalentAddressGroup> backendAddrs, 2726 List<EquivalentAddressGroup> balancerAddrs, 2727 final GrpclbConfig grpclbConfig) { 2728 final Attributes attrs = 2729 Attributes.newBuilder().set(GrpclbConstants.ATTR_LB_ADDRS, balancerAddrs).build(); 2730 syncContext.execute(new Runnable() { 2731 @Override 2732 public void run() { 2733 balancer.acceptResolvedAddresses( 2734 ResolvedAddresses.newBuilder() 2735 .setAddresses(backendAddrs) 2736 .setAttributes(attrs) 2737 .setLoadBalancingPolicyConfig(grpclbConfig) 2738 .build()); 2739 } 2740 }); 2741 } 2742 getLoadRecorder()2743 private GrpclbClientLoadRecorder getLoadRecorder() { 2744 return balancer.getGrpclbState().getLoadRecorder(); 2745 } 2746 createResolvedBackendAddresses(int n)2747 private static List<EquivalentAddressGroup> createResolvedBackendAddresses(int n) { 2748 List<EquivalentAddressGroup> list = new ArrayList<>(); 2749 for (int i = 0; i < n; i++) { 2750 SocketAddress addr = new FakeSocketAddress("fake-address-" + i); 2751 list.add(new EquivalentAddressGroup(addr)); 2752 } 2753 return list; 2754 } 2755 createResolvedBalancerAddresses(int n)2756 private static List<EquivalentAddressGroup> createResolvedBalancerAddresses(int n) { 2757 List<EquivalentAddressGroup> list = new ArrayList<>(); 2758 for (int i = 0; i < n; i++) { 2759 SocketAddress addr = new FakeSocketAddress("fake-address-" + i); 2760 list.add(new EquivalentAddressGroup(addr, lbAttributes(lbAuthority(i)))); 2761 } 2762 return list; 2763 } 2764 lbAuthority(int unused)2765 private static String lbAuthority(int unused) { 2766 // TODO(ejona): Support varying authorities 2767 return "lb.google.com"; 2768 } 2769 lbAttributes(String authority)2770 private static Attributes lbAttributes(String authority) { 2771 return Attributes.newBuilder() 2772 .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, authority) 2773 .build(); 2774 } 2775 buildInitialResponse()2776 private static LoadBalanceResponse buildInitialResponse() { 2777 return buildInitialResponse(0); 2778 } 2779 buildInitialResponse(long loadReportIntervalMillis)2780 private static LoadBalanceResponse buildInitialResponse(long loadReportIntervalMillis) { 2781 return LoadBalanceResponse.newBuilder() 2782 .setInitialResponse( 2783 InitialLoadBalanceResponse.newBuilder() 2784 .setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis))) 2785 .build(); 2786 } 2787 buildLbFallbackResponse()2788 private static LoadBalanceResponse buildLbFallbackResponse() { 2789 return LoadBalanceResponse.newBuilder() 2790 .setFallbackResponse(FallbackResponse.newBuilder().build()) 2791 .build(); 2792 } 2793 buildLbResponse(List<ServerEntry> servers)2794 private static LoadBalanceResponse buildLbResponse(List<ServerEntry> servers) { 2795 ServerList.Builder serverListBuilder = ServerList.newBuilder(); 2796 for (ServerEntry server : servers) { 2797 if (server.addr != null) { 2798 serverListBuilder.addServers(Server.newBuilder() 2799 .setIpAddress(ByteString.copyFrom(server.addr.getAddress().getAddress())) 2800 .setPort(server.addr.getPort()) 2801 .setLoadBalanceToken(server.token) 2802 .build()); 2803 } else { 2804 serverListBuilder.addServers(Server.newBuilder() 2805 .setDrop(true) 2806 .setLoadBalanceToken(server.token) 2807 .build()); 2808 } 2809 } 2810 return LoadBalanceResponse.newBuilder() 2811 .setServerList(serverListBuilder.build()) 2812 .build(); 2813 } 2814 xattr(List<EquivalentAddressGroup> lbAddr)2815 private List<EquivalentAddressGroup> xattr(List<EquivalentAddressGroup> lbAddr) { 2816 List<EquivalentAddressGroup> oobAddr = new ArrayList<>(lbAddr.size()); 2817 for (EquivalentAddressGroup lb : lbAddr) { 2818 String authority = lb.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY); 2819 Attributes attrs = lb.getAttributes().toBuilder() 2820 .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, authority) 2821 .build(); 2822 oobAddr.add(new EquivalentAddressGroup(lb.getAddresses(), attrs)); 2823 } 2824 return oobAddr; 2825 } 2826 2827 private static class ServerEntry { 2828 final InetSocketAddress addr; 2829 final String token; 2830 ServerEntry(String host, int port, String token)2831 ServerEntry(String host, int port, String token) { 2832 this.addr = new InetSocketAddress(host, port); 2833 this.token = token; 2834 } 2835 2836 // Drop entry ServerEntry(String token)2837 ServerEntry(String token) { 2838 this.addr = null; 2839 this.token = token; 2840 } 2841 } 2842 2843 private static class FakeSubchannel extends Subchannel { 2844 private final Attributes attributes; 2845 private List<EquivalentAddressGroup> eags; 2846 private SubchannelStateListener listener; 2847 FakeSubchannel(List<EquivalentAddressGroup> eags, Attributes attributes)2848 public FakeSubchannel(List<EquivalentAddressGroup> eags, Attributes attributes) { 2849 this.eags = Collections.unmodifiableList(eags); 2850 this.attributes = attributes; 2851 } 2852 2853 @Override getAllAddresses()2854 public List<EquivalentAddressGroup> getAllAddresses() { 2855 return eags; 2856 } 2857 2858 @Override getAttributes()2859 public Attributes getAttributes() { 2860 return attributes; 2861 } 2862 2863 @Override start(SubchannelStateListener listener)2864 public void start(SubchannelStateListener listener) { 2865 this.listener = checkNotNull(listener, "listener"); 2866 } 2867 2868 @Override updateAddresses(List<EquivalentAddressGroup> addrs)2869 public void updateAddresses(List<EquivalentAddressGroup> addrs) { 2870 this.eags = Collections.unmodifiableList(addrs); 2871 } 2872 2873 @Override shutdown()2874 public void shutdown() { 2875 } 2876 2877 @Override requestConnection()2878 public void requestConnection() { 2879 } 2880 updateState(ConnectivityStateInfo newState)2881 public void updateState(ConnectivityStateInfo newState) { 2882 listener.onSubchannelState(newState); 2883 } 2884 } 2885 2886 private class FakeHelper extends Helper { 2887 2888 @Override getSynchronizationContext()2889 public SynchronizationContext getSynchronizationContext() { 2890 return syncContext; 2891 } 2892 2893 @Override createOobChannel(List<EquivalentAddressGroup> eag, String authority)2894 public ManagedChannel createOobChannel(List<EquivalentAddressGroup> eag, String authority) { 2895 ManagedChannel channel = 2896 InProcessChannelBuilder 2897 .forName("fakeLb") 2898 .directExecutor() 2899 .overrideAuthority(authority) 2900 .build(); 2901 fakeOobChannels.add(channel); 2902 oobChannelTracker.add(channel); 2903 return channel; 2904 } 2905 2906 @Override createOobChannel(EquivalentAddressGroup eag, String authority)2907 public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { 2908 return createOobChannel(Collections.singletonList(eag), authority); 2909 } 2910 2911 @Override createSubchannel(CreateSubchannelArgs args)2912 public Subchannel createSubchannel(CreateSubchannelArgs args) { 2913 FakeSubchannel subchannel = 2914 mock( 2915 FakeSubchannel.class, 2916 AdditionalAnswers 2917 .delegatesTo(new FakeSubchannel(args.getAddresses(), args.getAttributes()))); 2918 mockSubchannels.add(subchannel); 2919 unpooledSubchannelTracker.add(subchannel); 2920 return subchannel; 2921 } 2922 2923 @Override getScheduledExecutorService()2924 public ScheduledExecutorService getScheduledExecutorService() { 2925 return fakeClock.getScheduledExecutorService(); 2926 } 2927 2928 @Override getChannelLogger()2929 public ChannelLogger getChannelLogger() { 2930 return channelLogger; 2931 } 2932 2933 @Override updateBalancingState( @onnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker)2934 public void updateBalancingState( 2935 @Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker) { 2936 currentPicker = newPicker; 2937 } 2938 2939 @Override refreshNameResolution()2940 public void refreshNameResolution() { 2941 } 2942 2943 @Override getAuthority()2944 public String getAuthority() { 2945 return SERVICE_AUTHORITY; 2946 } 2947 2948 @Override updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag)2949 public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) { 2950 } 2951 2952 @Override updateOobChannelAddresses(ManagedChannel channel, List<EquivalentAddressGroup> eag)2953 public void updateOobChannelAddresses(ManagedChannel channel, 2954 List<EquivalentAddressGroup> eag) { 2955 } 2956 } 2957 } 2958