1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.okhttp; 18 19 import static com.google.common.base.Charsets.UTF_8; 20 import static com.google.common.truth.Truth.assertThat; 21 import static io.grpc.internal.ClientStreamListener.RpcProgress.MISCARRIED; 22 import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; 23 import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED; 24 import static io.grpc.okhttp.Headers.CONTENT_TYPE_HEADER; 25 import static io.grpc.okhttp.Headers.HTTP_SCHEME_HEADER; 26 import static io.grpc.okhttp.Headers.METHOD_HEADER; 27 import static io.grpc.okhttp.Headers.TE_HEADER; 28 import static org.junit.Assert.assertEquals; 29 import static org.junit.Assert.assertFalse; 30 import static org.junit.Assert.assertNotNull; 31 import static org.junit.Assert.assertNull; 32 import static org.junit.Assert.assertSame; 33 import static org.junit.Assert.assertTrue; 34 import static org.junit.Assert.fail; 35 import static org.mockito.ArgumentMatchers.any; 36 import static org.mockito.ArgumentMatchers.anyBoolean; 37 import static org.mockito.ArgumentMatchers.anyInt; 38 import static org.mockito.ArgumentMatchers.eq; 39 import static org.mockito.ArgumentMatchers.isA; 40 import static org.mockito.ArgumentMatchers.same; 41 import static org.mockito.Mockito.inOrder; 42 import static org.mockito.Mockito.mock; 43 import static org.mockito.Mockito.never; 44 import static org.mockito.Mockito.reset; 45 import static org.mockito.Mockito.timeout; 46 import static org.mockito.Mockito.verify; 47 import static org.mockito.Mockito.verifyNoInteractions; 48 49 import com.google.common.base.Preconditions; 50 import com.google.common.base.Stopwatch; 51 import com.google.common.base.Supplier; 52 import com.google.common.base.Ticker; 53 import com.google.common.collect.ImmutableList; 54 import com.google.common.util.concurrent.Futures; 55 import com.google.common.util.concurrent.MoreExecutors; 56 import com.google.common.util.concurrent.SettableFuture; 57 import io.grpc.Attributes; 58 import io.grpc.CallOptions; 59 import io.grpc.ClientStreamTracer; 60 import io.grpc.HttpConnectProxiedSocketAddress; 61 import io.grpc.InternalChannelz.SocketStats; 62 import io.grpc.InternalChannelz.TransportStats; 63 import io.grpc.InternalInstrumented; 64 import io.grpc.InternalStatus; 65 import io.grpc.Metadata; 66 import io.grpc.MethodDescriptor; 67 import io.grpc.MethodDescriptor.MethodType; 68 import io.grpc.Status; 69 import io.grpc.Status.Code; 70 import io.grpc.StatusException; 71 import io.grpc.internal.AbstractStream; 72 import io.grpc.internal.ClientStreamListener; 73 import io.grpc.internal.ClientTransport; 74 import io.grpc.internal.FakeClock; 75 import io.grpc.internal.GrpcUtil; 76 import io.grpc.internal.ManagedClientTransport; 77 import io.grpc.okhttp.OkHttpClientTransport.ClientFrameHandler; 78 import io.grpc.okhttp.OkHttpFrameLogger.Direction; 79 import io.grpc.okhttp.internal.Protocol; 80 import io.grpc.okhttp.internal.framed.ErrorCode; 81 import io.grpc.okhttp.internal.framed.FrameReader; 82 import io.grpc.okhttp.internal.framed.FrameWriter; 83 import io.grpc.okhttp.internal.framed.Header; 84 import io.grpc.okhttp.internal.framed.HeadersMode; 85 import io.grpc.okhttp.internal.framed.Settings; 86 import io.grpc.okhttp.internal.framed.Variant; 87 import io.grpc.testing.TestMethodDescriptors; 88 import java.io.BufferedReader; 89 import java.io.ByteArrayInputStream; 90 import java.io.IOException; 91 import java.io.InputStream; 92 import java.io.InputStreamReader; 93 import java.io.OutputStream; 94 import java.io.PipedInputStream; 95 import java.io.PipedOutputStream; 96 import java.net.InetAddress; 97 import java.net.InetSocketAddress; 98 import java.net.ServerSocket; 99 import java.net.Socket; 100 import java.net.SocketAddress; 101 import java.util.ArrayDeque; 102 import java.util.ArrayList; 103 import java.util.Arrays; 104 import java.util.List; 105 import java.util.Queue; 106 import java.util.concurrent.CountDownLatch; 107 import java.util.concurrent.ExecutionException; 108 import java.util.concurrent.ExecutorService; 109 import java.util.concurrent.Executors; 110 import java.util.concurrent.LinkedBlockingQueue; 111 import java.util.concurrent.TimeUnit; 112 import java.util.concurrent.atomic.AtomicBoolean; 113 import java.util.logging.Handler; 114 import java.util.logging.Level; 115 import java.util.logging.LogRecord; 116 import java.util.logging.Logger; 117 import javax.annotation.Nullable; 118 import javax.net.SocketFactory; 119 import okio.Buffer; 120 import okio.BufferedSink; 121 import okio.BufferedSource; 122 import okio.ByteString; 123 import org.junit.After; 124 import org.junit.Rule; 125 import org.junit.Test; 126 import org.junit.rules.Timeout; 127 import org.junit.runner.RunWith; 128 import org.junit.runners.JUnit4; 129 import org.mockito.AdditionalAnswers; 130 import org.mockito.ArgumentCaptor; 131 import org.mockito.ArgumentMatchers; 132 import org.mockito.InOrder; 133 import org.mockito.Mock; 134 import org.mockito.junit.MockitoJUnit; 135 import org.mockito.junit.MockitoRule; 136 137 /** 138 * Tests for {@link OkHttpClientTransport}. 139 */ 140 @RunWith(JUnit4.class) 141 public class OkHttpClientTransportTest { 142 private static final int TIME_OUT_MS = 2000; 143 private static final int INITIAL_WINDOW_SIZE = 65535; 144 private static final String NETWORK_ISSUE_MESSAGE = "network issue"; 145 private static final String ERROR_MESSAGE = "simulated error"; 146 // The gRPC header length, which includes 1 byte compression flag and 4 bytes message length. 147 private static final int HEADER_LENGTH = 5; 148 private static final Status SHUTDOWN_REASON = Status.UNAVAILABLE.withDescription("for test"); 149 private static final HttpConnectProxiedSocketAddress NO_PROXY = null; 150 private static final int DEFAULT_START_STREAM_ID = 3; 151 private static final Attributes EAG_ATTRS = Attributes.EMPTY; 152 private static final Logger logger = Logger.getLogger(OkHttpClientTransport.class.getName()); 153 private static final ClientStreamTracer[] tracers = new ClientStreamTracer[] { 154 new ClientStreamTracer() {} 155 }; 156 157 @Rule public final Timeout globalTimeout = Timeout.seconds(10); 158 @Rule public final MockitoRule mocks = MockitoJUnit.rule(); 159 160 private MethodDescriptor<Void, Void> method = TestMethodDescriptors.voidMethod(); 161 162 @Mock 163 private ManagedClientTransport.Listener transportListener; 164 165 private final Queue<Buffer> capturedBuffer = new ArrayDeque<>(); 166 private OkHttpClientTransport clientTransport; 167 private final MockFrameReader frameReader = new MockFrameReader(); 168 private final Socket socket = new MockSocket(frameReader); 169 private final FrameWriter frameWriter = mock(FrameWriter.class, AdditionalAnswers.delegatesTo( 170 new MockFrameWriter(socket, capturedBuffer))); 171 private ExecutorService executor = Executors.newCachedThreadPool(); 172 private long nanoTime; // backs a ticker, for testing ping round-trip time measurement 173 private SettableFuture<Void> connectedFuture; 174 private Runnable tooManyPingsRunnable = new Runnable() { 175 @Override public void run() { 176 throw new AssertionError(); 177 } 178 }; 179 private OkHttpChannelBuilder channelBuilder = OkHttpChannelBuilder.forAddress("127.0.0.1", 1234) 180 .usePlaintext() 181 .executor(new FakeClock().getScheduledExecutorService()) // Executor unused 182 .scheduledExecutorService(new FakeClock().getScheduledExecutorService()) // Executor unused 183 .transportExecutor(executor) 184 .flowControlWindow(INITIAL_WINDOW_SIZE); 185 186 @After tearDown()187 public void tearDown() { 188 executor.shutdownNow(); 189 } 190 initTransport()191 private void initTransport() throws Exception { 192 startTransport( 193 DEFAULT_START_STREAM_ID, null, true, null); 194 } 195 initTransport(int startId)196 private void initTransport(int startId) throws Exception { 197 startTransport(startId, null, true, null); 198 } 199 startTransport(int startId, @Nullable Runnable connectingCallback, boolean waitingForConnected, String userAgent)200 private void startTransport(int startId, @Nullable Runnable connectingCallback, 201 boolean waitingForConnected, String userAgent) 202 throws Exception { 203 connectedFuture = SettableFuture.create(); 204 final Ticker ticker = new Ticker() { 205 @Override 206 public long read() { 207 return nanoTime; 208 } 209 }; 210 Supplier<Stopwatch> stopwatchSupplier = new Supplier<Stopwatch>() { 211 @Override 212 public Stopwatch get() { 213 return Stopwatch.createUnstarted(ticker); 214 } 215 }; 216 channelBuilder.socketFactory(new FakeSocketFactory(socket)); 217 clientTransport = new OkHttpClientTransport( 218 channelBuilder.buildTransportFactory(), 219 userAgent, 220 stopwatchSupplier, 221 new FakeVariant(frameReader, frameWriter), 222 connectingCallback, 223 connectedFuture, 224 tooManyPingsRunnable); 225 clientTransport.start(transportListener); 226 if (waitingForConnected) { 227 connectedFuture.get(TIME_OUT_MS, TimeUnit.MILLISECONDS); 228 } 229 if (startId != DEFAULT_START_STREAM_ID) { 230 clientTransport.setNextStreamId(startId); 231 } 232 } 233 234 @Test testToString()235 public void testToString() throws Exception { 236 InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415); 237 clientTransport = new OkHttpClientTransport( 238 channelBuilder.buildTransportFactory(), 239 address, 240 "hostname", 241 /*userAgent=*/ null, 242 EAG_ATTRS, 243 NO_PROXY, 244 tooManyPingsRunnable); 245 String s = clientTransport.toString(); 246 assertTrue("Unexpected: " + s, s.contains("OkHttpClientTransport")); 247 assertTrue("Unexpected: " + s, s.contains(address.toString())); 248 } 249 250 /** 251 * Test logging is functioning correctly for client received Http/2 frames. Not intended to test 252 * actual frame content being logged. 253 */ 254 @Test testClientHandlerFrameLogger()255 public void testClientHandlerFrameLogger() throws Exception { 256 final List<LogRecord> logs = new ArrayList<>(); 257 Handler handler = new Handler() { 258 @Override 259 public void publish(LogRecord record) { 260 logs.add(record); 261 } 262 263 @Override 264 public void flush() { 265 } 266 267 @Override 268 public void close() throws SecurityException { 269 } 270 }; 271 logger.addHandler(handler); 272 logger.setLevel(Level.ALL); 273 274 initTransport(); 275 assertThat(logs).hasSize(1); 276 LogRecord log = logs.remove(0); 277 assertThat(log.getMessage()).startsWith(Direction.OUTBOUND + " SETTINGS: ack=false"); 278 assertThat(log.getLevel()).isEqualTo(Level.FINE); 279 280 MockStreamListener listener = new MockStreamListener(); 281 OkHttpClientStream stream = 282 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 283 stream.start(listener); 284 stream.request(1); 285 286 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 287 assertThat(logs).hasSize(1); 288 log = logs.remove(0); 289 assertThat(log.getMessage()).startsWith(Direction.INBOUND + " HEADERS: streamId=" + 3); 290 assertThat(log.getLevel()).isEqualTo(Level.FINE); 291 292 final String message = "Hello Client"; 293 Buffer buffer = createMessageFrame(message); 294 frameHandler().data(false, 3, buffer, (int) buffer.size()); 295 assertThat(logs).hasSize(1); 296 log = logs.remove(0); 297 assertThat(log.getMessage()).startsWith(Direction.INBOUND + " DATA: streamId=" + 3); 298 assertThat(log.getLevel()).isEqualTo(Level.FINE); 299 300 // At most 64 bytes of data frame will be logged. 301 frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000); 302 assertThat(logs).hasSize(1); 303 log = logs.remove(0); 304 String data = log.getMessage(); 305 assertThat(data).endsWith("..."); 306 assertThat(data.substring(data.indexOf("bytes="), data.indexOf("..."))).hasLength(64 * 2 + 6); 307 308 // A SETTINGS ACK frame is sent out after receiving SETTINGS frame. 309 frameHandler().settings(false, new Settings()); 310 assertThat(logs).hasSize(2); 311 log = logs.remove(0); 312 assertThat(log.getMessage()).startsWith(Direction.INBOUND + " SETTINGS: ack=false"); 313 assertThat(log.getLevel()).isEqualTo(Level.FINE); 314 log = logs.remove(0); 315 assertThat(log.getMessage()).startsWith(Direction.OUTBOUND + " SETTINGS: ack=true"); 316 assertThat(log.getLevel()).isEqualTo(Level.FINE); 317 318 // A PING ACK frame is sent out after receiving PING frame. 319 frameHandler().ping(false, 0, 0); 320 assertThat(logs).hasSize(2); 321 log = logs.remove(0); 322 assertThat(log.getMessage()).startsWith(Direction.INBOUND + " PING: ack=false"); 323 assertThat(log.getLevel()).isEqualTo(Level.FINE); 324 log = logs.remove(0); 325 assertThat(log.getMessage()).startsWith(Direction.OUTBOUND + " PING: ack=true"); 326 assertThat(log.getLevel()).isEqualTo(Level.FINE); 327 328 // As server push is not supported, a RST_STREAM is sent out after receiving PUSH_PROMISE frame. 329 frameHandler().pushPromise(3, 3, grpcResponseHeaders()); 330 assertThat(logs).hasSize(2); 331 log = logs.remove(0); 332 assertThat(log.getMessage()).startsWith(Direction.INBOUND + " PUSH_PROMISE"); 333 assertThat(log.getLevel()).isEqualTo(Level.FINE); 334 log = logs.remove(0); 335 assertThat(log.getMessage()).startsWith(Direction.OUTBOUND + " RST_STREAM"); 336 assertThat(log.getLevel()).isEqualTo(Level.FINE); 337 338 frameHandler().rstStream(3, ErrorCode.CANCEL); 339 assertThat(logs).hasSize(1); 340 log = logs.remove(0); 341 assertThat(log.getMessage()).startsWith(Direction.INBOUND + " RST_STREAM"); 342 assertThat(log.getLevel()).isEqualTo(Level.FINE); 343 344 // Outbound GO_AWAY is responded after receiving inbound GO_AWAY frame. 345 frameHandler().goAway(3, ErrorCode.CANCEL, ByteString.EMPTY); 346 assertThat(logs).hasSize(2); 347 log = logs.remove(0); 348 assertThat(log.getMessage()).startsWith(Direction.INBOUND + " GO_AWAY"); 349 assertThat(log.getLevel()).isEqualTo(Level.FINE); 350 log = logs.remove(0); 351 assertThat(log.getMessage()).startsWith(Direction.OUTBOUND + " GO_AWAY"); 352 assertThat(log.getLevel()).isEqualTo(Level.FINE); 353 354 frameHandler().windowUpdate(3, 32); 355 assertThat(logs).hasSize(1); 356 log = logs.remove(0); 357 assertThat(log.getMessage()).startsWith(Direction.INBOUND + " WINDOW_UPDATE"); 358 assertThat(log.getLevel()).isEqualTo(Level.FINE); 359 360 logger.removeHandler(handler); 361 } 362 363 @Test maxMessageSizeShouldBeEnforced()364 public void maxMessageSizeShouldBeEnforced() throws Exception { 365 channelBuilder.maxInboundMessageSize(1); 366 initTransport(); 367 368 MockStreamListener listener = new MockStreamListener(); 369 OkHttpClientStream stream = 370 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 371 stream.start(listener); 372 stream.request(1); 373 assertContainStream(3); 374 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 375 assertNotNull(listener.headers); 376 377 // Receive the message. 378 final String message = "Hello Client"; 379 Buffer buffer = createMessageFrame(message); 380 frameHandler().data(false, 3, buffer, (int) buffer.size()); 381 382 listener.waitUntilStreamClosed(); 383 assertEquals(Code.RESOURCE_EXHAUSTED, listener.status.getCode()); 384 shutdownAndVerify(); 385 } 386 387 @Test includeInitialWindowSizeInFirstSettings()388 public void includeInitialWindowSizeInFirstSettings() throws Exception { 389 channelBuilder.flowControlWindow(65535); 390 initTransport(); 391 392 ArgumentCaptor<Settings> settings = ArgumentCaptor.forClass(Settings.class); 393 verify(frameWriter, timeout(TIME_OUT_MS)).settings(settings.capture()); 394 assertEquals(65535, settings.getValue().get(7)); 395 } 396 397 /** 398 * A "large" window size is anything over 65535 (the starting size for any connection-level 399 * flow control value). 400 */ 401 @Test includeInitialWindowSizeInFirstSettings_largeWindowSize()402 public void includeInitialWindowSizeInFirstSettings_largeWindowSize() throws Exception { 403 channelBuilder.flowControlWindow(75535); // 65535 + 10000 404 initTransport(); 405 406 ArgumentCaptor<Settings> settings = ArgumentCaptor.forClass(Settings.class); 407 verify(frameWriter, timeout(TIME_OUT_MS)).settings(settings.capture()); 408 assertEquals(75535, settings.getValue().get(7)); 409 410 verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0, 10000); 411 } 412 413 /** 414 * When nextFrame throws IOException, the transport should be aborted. 415 */ 416 @Test nextFrameThrowIoException()417 public void nextFrameThrowIoException() throws Exception { 418 initTransport(); 419 MockStreamListener listener1 = new MockStreamListener(); 420 MockStreamListener listener2 = new MockStreamListener(); 421 OkHttpClientStream stream1 = 422 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 423 stream1.start(listener1); 424 stream1.request(1); 425 OkHttpClientStream stream2 = 426 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 427 stream2.start(listener2); 428 stream2.request(1); 429 assertEquals(2, activeStreamCount()); 430 assertContainStream(3); 431 assertContainStream(5); 432 frameReader.throwIoExceptionForNextFrame(); 433 listener1.waitUntilStreamClosed(); 434 listener2.waitUntilStreamClosed(); 435 436 assertEquals(0, activeStreamCount()); 437 assertEquals(Status.INTERNAL.getCode(), listener1.status.getCode()); 438 assertEquals(NETWORK_ISSUE_MESSAGE, listener1.status.getCause().getMessage()); 439 assertEquals(Status.INTERNAL.getCode(), listener2.status.getCode()); 440 assertEquals(NETWORK_ISSUE_MESSAGE, listener2.status.getCause().getMessage()); 441 verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class)); 442 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 443 shutdownAndVerify(); 444 } 445 446 /** 447 * Test that even if an Error is thrown from the reading loop of the transport, 448 * it can still clean up and call transportShutdown() and transportTerminated() as expected 449 * by the channel. 450 */ 451 @Test nextFrameThrowsError()452 public void nextFrameThrowsError() throws Exception { 453 initTransport(); 454 MockStreamListener listener = new MockStreamListener(); 455 OkHttpClientStream stream = 456 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 457 stream.start(listener); 458 stream.request(1); 459 assertEquals(1, activeStreamCount()); 460 assertContainStream(3); 461 frameReader.throwErrorForNextFrame(); 462 listener.waitUntilStreamClosed(); 463 464 assertEquals(0, activeStreamCount()); 465 assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); 466 assertEquals(ERROR_MESSAGE, listener.status.getCause().getMessage()); 467 verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class)); 468 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 469 shutdownAndVerify(); 470 } 471 472 @Test nextFrameReturnFalse()473 public void nextFrameReturnFalse() throws Exception { 474 initTransport(); 475 MockStreamListener listener = new MockStreamListener(); 476 OkHttpClientStream stream = 477 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 478 stream.start(listener); 479 stream.request(1); 480 frameReader.nextFrameAtEndOfStream(); 481 listener.waitUntilStreamClosed(); 482 assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode()); 483 verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class)); 484 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 485 shutdownAndVerify(); 486 } 487 488 @Test readMessages()489 public void readMessages() throws Exception { 490 initTransport(); 491 final int numMessages = 10; 492 final String message = "Hello Client"; 493 MockStreamListener listener = new MockStreamListener(); 494 OkHttpClientStream stream = 495 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 496 stream.start(listener); 497 stream.request(numMessages); 498 assertContainStream(3); 499 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 500 assertNotNull(listener.headers); 501 for (int i = 0; i < numMessages; i++) { 502 Buffer buffer = createMessageFrame(message + i); 503 frameHandler().data(false, 3, buffer, (int) buffer.size()); 504 } 505 frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS); 506 listener.waitUntilStreamClosed(); 507 assertEquals(Status.OK, listener.status); 508 assertNotNull(listener.trailers); 509 assertEquals(numMessages, listener.messages.size()); 510 for (int i = 0; i < numMessages; i++) { 511 assertEquals(message + i, listener.messages.get(i)); 512 } 513 shutdownAndVerify(); 514 } 515 516 @Test receivedHeadersForInvalidStreamShouldKillConnection()517 public void receivedHeadersForInvalidStreamShouldKillConnection() throws Exception { 518 initTransport(); 519 // Empty headers block without correct content type or status 520 frameHandler().headers(false, false, 3, 0, new ArrayList<Header>(), 521 HeadersMode.HTTP_20_HEADERS); 522 verify(frameWriter, timeout(TIME_OUT_MS)) 523 .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class)); 524 verify(transportListener).transportShutdown(isA(Status.class)); 525 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 526 shutdownAndVerify(); 527 } 528 529 @Test receivedDataForInvalidStreamShouldKillConnection()530 public void receivedDataForInvalidStreamShouldKillConnection() throws Exception { 531 initTransport(); 532 frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000); 533 verify(frameWriter, timeout(TIME_OUT_MS)) 534 .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class)); 535 verify(transportListener).transportShutdown(isA(Status.class)); 536 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 537 shutdownAndVerify(); 538 } 539 540 @Test invalidInboundHeadersCancelStream()541 public void invalidInboundHeadersCancelStream() throws Exception { 542 initTransport(); 543 MockStreamListener listener = new MockStreamListener(); 544 OkHttpClientStream stream = 545 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 546 stream.start(listener); 547 stream.request(1); 548 assertContainStream(3); 549 // Headers block without correct content type or status 550 frameHandler().headers(false, false, 3, 0, Arrays.asList(new Header("random", "4")), 551 HeadersMode.HTTP_20_HEADERS); 552 // Now wait to receive 1000 bytes of data so we can have a better error message before 553 // cancelling the streaam. 554 frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000); 555 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); 556 assertNull(listener.headers); 557 assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); 558 assertNotNull(listener.trailers); 559 assertEquals("4", listener.trailers 560 .get(Metadata.Key.of("random", Metadata.ASCII_STRING_MARSHALLER))); 561 shutdownAndVerify(); 562 } 563 564 @Test invalidInboundTrailersPropagateToMetadata()565 public void invalidInboundTrailersPropagateToMetadata() throws Exception { 566 initTransport(); 567 MockStreamListener listener = new MockStreamListener(); 568 OkHttpClientStream stream = 569 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 570 stream.start(listener); 571 stream.request(1); 572 assertContainStream(3); 573 // Headers block with EOS without correct content type or status 574 frameHandler().headers(true, true, 3, 0, Arrays.asList(new Header("random", "4")), 575 HeadersMode.HTTP_20_HEADERS); 576 assertNull(listener.headers); 577 assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); 578 assertNotNull(listener.trailers); 579 assertEquals("4", listener.trailers 580 .get(Metadata.Key.of("random", Metadata.ASCII_STRING_MARSHALLER))); 581 shutdownAndVerify(); 582 } 583 584 @Test readStatus()585 public void readStatus() throws Exception { 586 initTransport(); 587 MockStreamListener listener = new MockStreamListener(); 588 OkHttpClientStream stream = 589 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 590 stream.start(listener); 591 assertContainStream(3); 592 frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS); 593 listener.waitUntilStreamClosed(); 594 assertEquals(Status.Code.OK, listener.status.getCode()); 595 shutdownAndVerify(); 596 } 597 598 @Test receiveReset()599 public void receiveReset() throws Exception { 600 initTransport(); 601 MockStreamListener listener = new MockStreamListener(); 602 OkHttpClientStream stream = 603 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 604 stream.start(listener); 605 assertContainStream(3); 606 frameHandler().rstStream(3, ErrorCode.PROTOCOL_ERROR); 607 listener.waitUntilStreamClosed(); 608 609 assertThat(listener.status.getDescription()).contains("Rst Stream"); 610 assertThat(listener.status.getCode()).isEqualTo(Code.INTERNAL); 611 shutdownAndVerify(); 612 } 613 614 615 @Test receiveResetNoError()616 public void receiveResetNoError() throws Exception { 617 initTransport(); 618 MockStreamListener listener = new MockStreamListener(); 619 OkHttpClientStream stream = 620 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 621 stream.start(listener); 622 assertContainStream(3); 623 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 624 Buffer buffer = createMessageFrame("a message"); 625 frameHandler().data(false, 3, buffer, (int) buffer.size()); 626 frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS); 627 frameHandler().rstStream(3, ErrorCode.NO_ERROR); 628 stream.request(1); 629 listener.waitUntilStreamClosed(); 630 631 assertTrue(listener.status.isOk()); 632 shutdownAndVerify(); 633 } 634 635 @Test cancelStream()636 public void cancelStream() throws Exception { 637 initTransport(); 638 MockStreamListener listener = new MockStreamListener(); 639 OkHttpClientStream stream = 640 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 641 stream.start(listener); 642 getStream(3).cancel(Status.CANCELLED); 643 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); 644 listener.waitUntilStreamClosed(); 645 assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(), 646 listener.status.getCode()); 647 shutdownAndVerify(); 648 } 649 650 @Test addDefaultUserAgent()651 public void addDefaultUserAgent() throws Exception { 652 initTransport(); 653 MockStreamListener listener = new MockStreamListener(); 654 OkHttpClientStream stream = 655 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 656 stream.start(listener); 657 Header userAgentHeader = new Header(GrpcUtil.USER_AGENT_KEY.name(), 658 GrpcUtil.getGrpcUserAgent("okhttp", null)); 659 List<Header> expectedHeaders = Arrays.asList(HTTP_SCHEME_HEADER, METHOD_HEADER, 660 new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"), 661 new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()), 662 userAgentHeader, CONTENT_TYPE_HEADER, TE_HEADER); 663 verify(frameWriter, timeout(TIME_OUT_MS)) 664 .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders)); 665 getStream(3).cancel(Status.CANCELLED); 666 shutdownAndVerify(); 667 } 668 669 @Test overrideDefaultUserAgent()670 public void overrideDefaultUserAgent() throws Exception { 671 startTransport(3, null, true, "fakeUserAgent"); 672 MockStreamListener listener = new MockStreamListener(); 673 OkHttpClientStream stream = 674 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 675 stream.start(listener); 676 List<Header> expectedHeaders = Arrays.asList(HTTP_SCHEME_HEADER, METHOD_HEADER, 677 new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"), 678 new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()), 679 new Header(GrpcUtil.USER_AGENT_KEY.name(), 680 GrpcUtil.getGrpcUserAgent("okhttp", "fakeUserAgent")), 681 CONTENT_TYPE_HEADER, TE_HEADER); 682 verify(frameWriter, timeout(TIME_OUT_MS)) 683 .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders)); 684 getStream(3).cancel(Status.CANCELLED); 685 shutdownAndVerify(); 686 } 687 688 @Test cancelStreamForDeadlineExceeded()689 public void cancelStreamForDeadlineExceeded() throws Exception { 690 initTransport(); 691 MockStreamListener listener = new MockStreamListener(); 692 OkHttpClientStream stream = 693 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 694 stream.start(listener); 695 getStream(3).cancel(Status.DEADLINE_EXCEEDED); 696 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); 697 listener.waitUntilStreamClosed(); 698 shutdownAndVerify(); 699 } 700 701 @Test writeMessage()702 public void writeMessage() throws Exception { 703 initTransport(); 704 final String message = "Hello Server"; 705 MockStreamListener listener = new MockStreamListener(); 706 OkHttpClientStream stream = 707 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 708 stream.start(listener); 709 InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8)); 710 assertEquals(12, input.available()); 711 stream.writeMessage(input); 712 stream.flush(); 713 verify(frameWriter, timeout(TIME_OUT_MS)) 714 .data(eq(false), eq(3), any(Buffer.class), eq(12 + HEADER_LENGTH)); 715 Buffer sentFrame = capturedBuffer.poll(); 716 assertEquals(createMessageFrame(message), sentFrame); 717 stream.cancel(Status.CANCELLED); 718 shutdownAndVerify(); 719 } 720 721 @Test transportTracer_windowSizeDefault()722 public void transportTracer_windowSizeDefault() throws Exception { 723 initTransport(); 724 TransportStats stats = getTransportStats(clientTransport); 725 assertEquals(INITIAL_WINDOW_SIZE / 2, stats.remoteFlowControlWindow); // Lower bound 726 assertEquals(INITIAL_WINDOW_SIZE, stats.localFlowControlWindow); 727 } 728 729 @Test transportTracer_windowSize_remote()730 public void transportTracer_windowSize_remote() throws Exception { 731 initTransport(); 732 TransportStats before = getTransportStats(clientTransport); 733 assertEquals(INITIAL_WINDOW_SIZE / 2, before.remoteFlowControlWindow); // Lower bound 734 assertEquals(INITIAL_WINDOW_SIZE, before.localFlowControlWindow); 735 736 frameHandler().windowUpdate(0, 1000); 737 TransportStats after = getTransportStats(clientTransport); 738 assertEquals(INITIAL_WINDOW_SIZE / 2, after.remoteFlowControlWindow); 739 assertEquals(INITIAL_WINDOW_SIZE + 1000, after.localFlowControlWindow); 740 } 741 742 @Test windowUpdate()743 public void windowUpdate() throws Exception { 744 initTransport(); 745 MockStreamListener listener1 = new MockStreamListener(); 746 MockStreamListener listener2 = new MockStreamListener(); 747 OkHttpClientStream stream1 = 748 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 749 stream1.start(listener1); 750 stream1.request(2); 751 752 OkHttpClientStream stream2 = 753 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 754 stream2.start(listener2); 755 stream2.request(2); 756 assertEquals(2, activeStreamCount()); 757 stream1 = getStream(3); 758 stream2 = getStream(5); 759 760 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 761 frameHandler().headers(false, false, 5, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 762 763 int messageLength = INITIAL_WINDOW_SIZE / 4; 764 byte[] fakeMessage = new byte[messageLength]; 765 766 // Stream 1 receives a message 767 Buffer buffer = createMessageFrame(fakeMessage); 768 int messageFrameLength = (int) buffer.size(); 769 frameHandler().data(false, 3, buffer, messageFrameLength); 770 771 // Stream 2 receives a message 772 buffer = createMessageFrame(fakeMessage); 773 frameHandler().data(false, 5, buffer, messageFrameLength); 774 775 verify(frameWriter, timeout(TIME_OUT_MS)) 776 .windowUpdate(eq(0), eq((long) 2 * messageFrameLength)); 777 reset(frameWriter); 778 779 // Stream 1 receives another message 780 buffer = createMessageFrame(fakeMessage); 781 frameHandler().data(false, 3, buffer, messageFrameLength); 782 783 verify(frameWriter, timeout(TIME_OUT_MS)) 784 .windowUpdate(eq(3), eq((long) 2 * messageFrameLength)); 785 786 // Stream 2 receives another message 787 buffer = createMessageFrame(fakeMessage); 788 frameHandler().data(false, 5, buffer, messageFrameLength); 789 790 verify(frameWriter, timeout(TIME_OUT_MS)) 791 .windowUpdate(eq(5), eq((long) 2 * messageFrameLength)); 792 verify(frameWriter, timeout(TIME_OUT_MS)) 793 .windowUpdate(eq(0), eq((long) 2 * messageFrameLength)); 794 795 stream1.cancel(Status.CANCELLED); 796 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); 797 listener1.waitUntilStreamClosed(); 798 assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(), 799 listener1.status.getCode()); 800 801 stream2.cancel(Status.CANCELLED); 802 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(5), eq(ErrorCode.CANCEL)); 803 listener2.waitUntilStreamClosed(); 804 assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(), 805 listener2.status.getCode()); 806 shutdownAndVerify(); 807 } 808 809 @Test windowUpdateWithInboundFlowControl()810 public void windowUpdateWithInboundFlowControl() throws Exception { 811 initTransport(); 812 MockStreamListener listener = new MockStreamListener(); 813 OkHttpClientStream stream = 814 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 815 stream.start(listener); 816 int messageLength = INITIAL_WINDOW_SIZE / 2 + 1; 817 byte[] fakeMessage = new byte[messageLength]; 818 819 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 820 Buffer buffer = createMessageFrame(fakeMessage); 821 long messageFrameLength = buffer.size(); 822 frameHandler().data(false, 3, buffer, (int) messageFrameLength); 823 ArgumentCaptor<Integer> idCaptor = ArgumentCaptor.forClass(Integer.class); 824 verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate( 825 idCaptor.capture(), eq(messageFrameLength)); 826 // Should only send window update for the connection. 827 assertEquals(1, idCaptor.getAllValues().size()); 828 assertEquals(0, (int)idCaptor.getValue()); 829 830 stream.request(1); 831 // We return the bytes for the stream window as we read the message. 832 verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(eq(3), eq(messageFrameLength)); 833 834 getStream(3).cancel(Status.CANCELLED); 835 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); 836 listener.waitUntilStreamClosed(); 837 assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(), 838 listener.status.getCode()); 839 shutdownAndVerify(); 840 } 841 842 /** 843 * Outbound flow control where the initial flow control window stays at the default size of 65535. 844 */ 845 @Test outboundFlowControl()846 public void outboundFlowControl() throws Exception { 847 initTransport(); 848 MockStreamListener listener = new MockStreamListener(); 849 OkHttpClientStream stream = 850 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 851 stream.start(listener); 852 853 // Outbound window always starts at 65535 until changed by Settings.INITIAL_WINDOW_SIZE 854 int initialOutboundWindowSize = 65535; 855 int messageLength = initialOutboundWindowSize / 2 + 1; 856 857 // The first message should be sent out. 858 InputStream input = new ByteArrayInputStream(new byte[messageLength]); 859 stream.writeMessage(input); 860 stream.flush(); 861 verify(frameWriter, timeout(TIME_OUT_MS)).data( 862 eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH)); 863 864 // The second message should be partially sent out. 865 input = new ByteArrayInputStream(new byte[messageLength]); 866 stream.writeMessage(input); 867 stream.flush(); 868 int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH; 869 verify(frameWriter, timeout(TIME_OUT_MS)) 870 .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize)); 871 872 // Get more credit so the rest of the data should be sent out. 873 frameHandler().windowUpdate(3, initialOutboundWindowSize); 874 frameHandler().windowUpdate(0, initialOutboundWindowSize); 875 verify(frameWriter, timeout(TIME_OUT_MS)).data( 876 eq(false), eq(3), any(Buffer.class), 877 eq(messageLength + HEADER_LENGTH - partiallySentSize)); 878 879 stream.cancel(Status.CANCELLED); 880 listener.waitUntilStreamClosed(); 881 shutdownAndVerify(); 882 } 883 884 /** 885 * Outbound flow control where the initial window size is reduced before a stream is started. 886 */ 887 @Test outboundFlowControl_smallWindowSize()888 public void outboundFlowControl_smallWindowSize() throws Exception { 889 initTransport(); 890 891 int initialOutboundWindowSize = 100; 892 setInitialWindowSize(initialOutboundWindowSize); 893 894 MockStreamListener listener = new MockStreamListener(); 895 OkHttpClientStream stream = 896 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 897 stream.start(listener); 898 899 int messageLength = 75; 900 // The first message should be sent out. 901 InputStream input = new ByteArrayInputStream(new byte[messageLength]); 902 stream.writeMessage(input); 903 stream.flush(); 904 verify(frameWriter, timeout(TIME_OUT_MS)).data( 905 eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH)); 906 907 // The second message should be partially sent out. 908 input = new ByteArrayInputStream(new byte[messageLength]); 909 stream.writeMessage(input); 910 stream.flush(); 911 int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH; 912 verify(frameWriter, timeout(TIME_OUT_MS)) 913 .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize)); 914 915 // Get more credit so the rest of the data should be sent out. 916 frameHandler().windowUpdate(3, initialOutboundWindowSize); 917 verify(frameWriter, timeout(TIME_OUT_MS)).data( 918 eq(false), eq(3), any(Buffer.class), 919 eq(messageLength + HEADER_LENGTH - partiallySentSize)); 920 921 stream.cancel(Status.CANCELLED); 922 listener.waitUntilStreamClosed(); 923 shutdownAndVerify(); 924 } 925 926 /** 927 * Outbound flow control where the initial window size is increased before a stream is started. 928 */ 929 @Test outboundFlowControl_bigWindowSize()930 public void outboundFlowControl_bigWindowSize() throws Exception { 931 initTransport(); 932 933 int initialOutboundWindowSize = 131070; // 65535 * 2 934 setInitialWindowSize(initialOutboundWindowSize); 935 frameHandler().windowUpdate(0, 65535); 936 937 MockStreamListener listener = new MockStreamListener(); 938 OkHttpClientStream stream = 939 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 940 stream.start(listener); 941 942 int messageLength = 100000; 943 // The first message should be sent out. 944 InputStream input = new ByteArrayInputStream(new byte[messageLength]); 945 stream.writeMessage(input); 946 stream.flush(); 947 verify(frameWriter, timeout(TIME_OUT_MS)).data( 948 eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH)); 949 950 // The second message should be partially sent out. 951 input = new ByteArrayInputStream(new byte[messageLength]); 952 stream.writeMessage(input); 953 stream.flush(); 954 int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH; 955 verify(frameWriter, timeout(TIME_OUT_MS)) 956 .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize)); 957 958 // Get more credit so the rest of the data should be sent out. 959 frameHandler().windowUpdate(0, initialOutboundWindowSize); 960 frameHandler().windowUpdate(3, initialOutboundWindowSize); 961 verify(frameWriter, timeout(TIME_OUT_MS)).data( 962 eq(false), eq(3), any(Buffer.class), 963 eq(messageLength + HEADER_LENGTH - partiallySentSize)); 964 965 stream.cancel(Status.CANCELLED); 966 listener.waitUntilStreamClosed(); 967 shutdownAndVerify(); 968 } 969 970 @Test outboundFlowControlWithInitialWindowSizeChange()971 public void outboundFlowControlWithInitialWindowSizeChange() throws Exception { 972 initTransport(); 973 MockStreamListener listener = new MockStreamListener(); 974 OkHttpClientStream stream = 975 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 976 stream.start(listener); 977 int messageLength = 20; 978 setInitialWindowSize(HEADER_LENGTH + 10); 979 InputStream input = new ByteArrayInputStream(new byte[messageLength]); 980 stream.writeMessage(input); 981 stream.flush(); 982 // part of the message can be sent. 983 verify(frameWriter, timeout(TIME_OUT_MS)) 984 .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 10)); 985 // Avoid connection flow control. 986 frameHandler().windowUpdate(0, HEADER_LENGTH + 10); 987 988 // Increase initial window size 989 setInitialWindowSize(HEADER_LENGTH + 20); 990 // The rest data should be sent. 991 verify(frameWriter, timeout(TIME_OUT_MS)).data(eq(false), eq(3), any(Buffer.class), eq(10)); 992 frameHandler().windowUpdate(0, 10); 993 994 // Decrease initial window size to HEADER_LENGTH, since we've already sent 995 // out HEADER_LENGTH + 20 bytes data, the window size should be -20 now. 996 setInitialWindowSize(HEADER_LENGTH); 997 // Get 20 tokens back, still can't send any data. 998 frameHandler().windowUpdate(3, 20); 999 input = new ByteArrayInputStream(new byte[messageLength]); 1000 stream.writeMessage(input); 1001 stream.flush(); 1002 // Only the previous two write operations happened. 1003 verify(frameWriter, timeout(TIME_OUT_MS).times(2)) 1004 .data(anyBoolean(), anyInt(), any(Buffer.class), anyInt()); 1005 1006 // Get enough tokens to send the pending message. 1007 frameHandler().windowUpdate(3, HEADER_LENGTH + 20); 1008 verify(frameWriter, timeout(TIME_OUT_MS)) 1009 .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 20)); 1010 1011 stream.cancel(Status.CANCELLED); 1012 listener.waitUntilStreamClosed(); 1013 shutdownAndVerify(); 1014 } 1015 1016 @Test outboundFlowControlWithInitialWindowSizeChangeInMiddleOfStream()1017 public void outboundFlowControlWithInitialWindowSizeChangeInMiddleOfStream() throws Exception { 1018 initTransport(); 1019 MockStreamListener listener = new MockStreamListener(); 1020 OkHttpClientStream stream = 1021 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1022 stream.start(listener); 1023 int messageLength = 20; 1024 setInitialWindowSize(HEADER_LENGTH + 10); 1025 InputStream input = new ByteArrayInputStream(new byte[messageLength]); 1026 stream.writeMessage(input); 1027 stream.flush(); 1028 // part of the message can be sent. 1029 verify(frameWriter, timeout(TIME_OUT_MS)) 1030 .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 10)); 1031 // Avoid connection flow control. 1032 frameHandler().windowUpdate(0, HEADER_LENGTH + 20); 1033 1034 // Increase initial window size 1035 setInitialWindowSize(HEADER_LENGTH + 20); 1036 1037 // wait until pending frames sent (inOrder doesn't support timeout) 1038 verify(frameWriter, timeout(TIME_OUT_MS).atLeastOnce()) 1039 .data(eq(false), eq(3), any(Buffer.class), eq(10)); 1040 // It should ack the settings, then send remaining message. 1041 InOrder inOrder = inOrder(frameWriter); 1042 inOrder.verify(frameWriter).ackSettings(any(Settings.class)); 1043 inOrder.verify(frameWriter).data(eq(false), eq(3), any(Buffer.class), eq(10)); 1044 1045 stream.cancel(Status.CANCELLED); 1046 listener.waitUntilStreamClosed(); 1047 shutdownAndVerify(); 1048 } 1049 1050 @Test stopNormally()1051 public void stopNormally() throws Exception { 1052 initTransport(); 1053 MockStreamListener listener1 = new MockStreamListener(); 1054 MockStreamListener listener2 = new MockStreamListener(); 1055 OkHttpClientStream stream1 = 1056 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1057 stream1.start(listener1); 1058 OkHttpClientStream stream2 = 1059 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1060 stream2.start(listener2); 1061 assertEquals(2, activeStreamCount()); 1062 clientTransport.shutdown(SHUTDOWN_REASON); 1063 1064 assertEquals(2, activeStreamCount()); 1065 verify(transportListener).transportShutdown(same(SHUTDOWN_REASON)); 1066 1067 stream1.cancel(Status.CANCELLED); 1068 stream2.cancel(Status.CANCELLED); 1069 listener1.waitUntilStreamClosed(); 1070 listener2.waitUntilStreamClosed(); 1071 assertEquals(0, activeStreamCount()); 1072 assertEquals(Status.CANCELLED.getCode(), listener1.status.getCode()); 1073 assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode()); 1074 verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any()); 1075 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1076 shutdownAndVerify(); 1077 } 1078 1079 @Test receiveGoAway()1080 public void receiveGoAway() throws Exception { 1081 initTransport(); 1082 // start 2 streams. 1083 MockStreamListener listener1 = new MockStreamListener(); 1084 MockStreamListener listener2 = new MockStreamListener(); 1085 OkHttpClientStream stream1 = 1086 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1087 stream1.start(listener1); 1088 stream1.request(1); 1089 OkHttpClientStream stream2 = 1090 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1091 stream2.start(listener2); 1092 stream2.request(1); 1093 assertEquals(2, activeStreamCount()); 1094 1095 // Receive goAway, max good id is 3. 1096 frameHandler().goAway(3, ErrorCode.CANCEL, ByteString.EMPTY); 1097 1098 // Transport should be in STOPPING state. 1099 verify(transportListener).transportShutdown(isA(Status.class)); 1100 verify(transportListener, never()).transportTerminated(); 1101 1102 // Stream 2 should be closed. 1103 listener2.waitUntilStreamClosed(); 1104 assertEquals(1, activeStreamCount()); 1105 assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode()); 1106 1107 // New stream should be failed. 1108 assertNewStreamFail(); 1109 1110 // But stream 1 should be able to send. 1111 final String sentMessage = "Should I also go away?"; 1112 OkHttpClientStream stream = getStream(3); 1113 InputStream input = new ByteArrayInputStream(sentMessage.getBytes(UTF_8)); 1114 assertEquals(22, input.available()); 1115 stream.writeMessage(input); 1116 stream.flush(); 1117 verify(frameWriter, timeout(TIME_OUT_MS)) 1118 .data(eq(false), eq(3), any(Buffer.class), eq(22 + HEADER_LENGTH)); 1119 Buffer sentFrame = capturedBuffer.poll(); 1120 assertEquals(createMessageFrame(sentMessage), sentFrame); 1121 1122 // And read. 1123 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 1124 final String receivedMessage = "No, you are fine."; 1125 Buffer buffer = createMessageFrame(receivedMessage); 1126 frameHandler().data(false, 3, buffer, (int) buffer.size()); 1127 frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS); 1128 listener1.waitUntilStreamClosed(); 1129 assertEquals(1, listener1.messages.size()); 1130 assertEquals(receivedMessage, listener1.messages.get(0)); 1131 1132 // The transport should be stopped after all active streams finished. 1133 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1134 shutdownAndVerify(); 1135 } 1136 1137 @Test streamIdExhausted()1138 public void streamIdExhausted() throws Exception { 1139 int startId = Integer.MAX_VALUE - 2; 1140 initTransport(startId); 1141 1142 MockStreamListener listener = new MockStreamListener(); 1143 OkHttpClientStream stream = 1144 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1145 stream.start(listener); 1146 stream.request(1); 1147 1148 // New stream should be failed. 1149 assertNewStreamFail(); 1150 1151 // The alive stream should be functional, receives a message. 1152 frameHandler().headers( 1153 false, false, startId, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 1154 assertNotNull(listener.headers); 1155 String message = "hello"; 1156 Buffer buffer = createMessageFrame(message); 1157 frameHandler().data(false, startId, buffer, (int) buffer.size()); 1158 1159 getStream(startId).cancel(Status.CANCELLED); 1160 // Receives the second message after be cancelled. 1161 buffer = createMessageFrame(message); 1162 frameHandler().data(false, startId, buffer, (int) buffer.size()); 1163 1164 listener.waitUntilStreamClosed(); 1165 // Should only have the first message delivered. 1166 assertEquals(message, listener.messages.get(0)); 1167 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(startId), eq(ErrorCode.CANCEL)); 1168 verify(transportListener).transportShutdown(isA(Status.class)); 1169 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1170 shutdownAndVerify(); 1171 } 1172 1173 @Test pendingStreamSucceed()1174 public void pendingStreamSucceed() throws Exception { 1175 initTransport(); 1176 setMaxConcurrentStreams(1); 1177 final MockStreamListener listener1 = new MockStreamListener(); 1178 final MockStreamListener listener2 = new MockStreamListener(); 1179 OkHttpClientStream stream1 = 1180 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1181 stream1.start(listener1); 1182 // The second stream should be pending. 1183 OkHttpClientStream stream2 = 1184 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1185 stream2.start(listener2); 1186 String sentMessage = "hello"; 1187 InputStream input = new ByteArrayInputStream(sentMessage.getBytes(UTF_8)); 1188 assertEquals(5, input.available()); 1189 stream2.writeMessage(input); 1190 stream2.flush(); 1191 stream2.halfClose(); 1192 1193 waitForStreamPending(1); 1194 assertEquals(1, activeStreamCount()); 1195 1196 // Finish the first stream 1197 stream1.cancel(Status.CANCELLED); 1198 listener1.waitUntilStreamClosed(); 1199 1200 // The second stream should be active now, and the pending data should be sent out. 1201 assertEquals(1, activeStreamCount()); 1202 assertEquals(0, clientTransport.getPendingStreamSize()); 1203 verify(frameWriter, timeout(TIME_OUT_MS)) 1204 .data(eq(true), eq(5), any(Buffer.class), eq(5 + HEADER_LENGTH)); 1205 Buffer sentFrame = capturedBuffer.poll(); 1206 assertEquals(createMessageFrame(sentMessage), sentFrame); 1207 stream2.cancel(Status.CANCELLED); 1208 shutdownAndVerify(); 1209 } 1210 1211 @Test pendingStreamCancelled()1212 public void pendingStreamCancelled() throws Exception { 1213 initTransport(); 1214 setMaxConcurrentStreams(0); 1215 MockStreamListener listener = new MockStreamListener(); 1216 OkHttpClientStream stream = 1217 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1218 stream.start(listener); 1219 waitForStreamPending(1); 1220 stream.cancel(Status.CANCELLED); 1221 // The second cancel should be an no-op. 1222 stream.cancel(Status.UNKNOWN); 1223 listener.waitUntilStreamClosed(); 1224 assertEquals(0, clientTransport.getPendingStreamSize()); 1225 assertEquals(Status.CANCELLED.getCode(), listener.status.getCode()); 1226 shutdownAndVerify(); 1227 } 1228 1229 @Test pendingStreamFailedByGoAway()1230 public void pendingStreamFailedByGoAway() throws Exception { 1231 initTransport(); 1232 setMaxConcurrentStreams(1); 1233 final MockStreamListener listener1 = new MockStreamListener(); 1234 final MockStreamListener listener2 = new MockStreamListener(); 1235 OkHttpClientStream stream1 = 1236 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1237 stream1.start(listener1); 1238 // The second stream should be pending. 1239 OkHttpClientStream stream2 = 1240 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1241 stream2.start(listener2); 1242 1243 waitForStreamPending(1); 1244 assertEquals(1, activeStreamCount()); 1245 1246 // Receives GO_AWAY. 1247 frameHandler().goAway(99, ErrorCode.CANCEL, ByteString.EMPTY); 1248 1249 listener2.waitUntilStreamClosed(); 1250 assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode()); 1251 assertEquals(0, clientTransport.getPendingStreamSize()); 1252 1253 // active stream should not be affected. 1254 assertEquals(1, activeStreamCount()); 1255 getStream(3).cancel(Status.CANCELLED); 1256 shutdownAndVerify(); 1257 } 1258 1259 @Test pendingStreamSucceedAfterShutdown()1260 public void pendingStreamSucceedAfterShutdown() throws Exception { 1261 initTransport(); 1262 setMaxConcurrentStreams(0); 1263 final MockStreamListener listener = new MockStreamListener(); 1264 // The second stream should be pending. 1265 OkHttpClientStream stream = 1266 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1267 stream.start(listener); 1268 waitForStreamPending(1); 1269 1270 clientTransport.shutdown(SHUTDOWN_REASON); 1271 setMaxConcurrentStreams(1); 1272 verify(frameWriter, timeout(TIME_OUT_MS)) 1273 .synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader()); 1274 assertEquals(1, activeStreamCount()); 1275 stream.cancel(Status.CANCELLED); 1276 shutdownAndVerify(); 1277 } 1278 1279 @Test pendingStreamFailedByIdExhausted()1280 public void pendingStreamFailedByIdExhausted() throws Exception { 1281 int startId = Integer.MAX_VALUE - 4; 1282 initTransport(startId); 1283 setMaxConcurrentStreams(1); 1284 1285 final MockStreamListener listener1 = new MockStreamListener(); 1286 final MockStreamListener listener2 = new MockStreamListener(); 1287 final MockStreamListener listener3 = new MockStreamListener(); 1288 1289 OkHttpClientStream stream1 = 1290 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1291 stream1.start(listener1); 1292 1293 // The second and third stream should be pending. 1294 OkHttpClientStream stream2 = 1295 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1296 stream2.start(listener2); 1297 OkHttpClientStream stream3 = 1298 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1299 stream3.start(listener3); 1300 1301 waitForStreamPending(2); 1302 assertEquals(1, activeStreamCount()); 1303 1304 // Now finish stream1, stream2 should be started and exhaust the id, 1305 // so stream3 should be failed. 1306 stream1.cancel(Status.CANCELLED); 1307 listener1.waitUntilStreamClosed(); 1308 listener3.waitUntilStreamClosed(); 1309 assertEquals(Status.UNAVAILABLE.getCode(), listener3.status.getCode()); 1310 assertEquals(0, clientTransport.getPendingStreamSize()); 1311 assertEquals(1, activeStreamCount()); 1312 stream2 = getStream(startId + 2); 1313 stream2.cancel(Status.CANCELLED); 1314 shutdownAndVerify(); 1315 } 1316 1317 @Test receivingWindowExceeded()1318 public void receivingWindowExceeded() throws Exception { 1319 initTransport(); 1320 MockStreamListener listener = new MockStreamListener(); 1321 OkHttpClientStream stream = 1322 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1323 stream.start(listener); 1324 stream.request(1); 1325 1326 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 1327 1328 int messageLength = INITIAL_WINDOW_SIZE + 1; 1329 byte[] fakeMessage = new byte[messageLength]; 1330 Buffer buffer = createMessageFrame(fakeMessage); 1331 int messageFrameLength = (int) buffer.size(); 1332 frameHandler().data(false, 3, buffer, messageFrameLength); 1333 1334 listener.waitUntilStreamClosed(); 1335 assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); 1336 assertEquals("Received data size exceeded our receiving window size", 1337 listener.status.getDescription()); 1338 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.FLOW_CONTROL_ERROR)); 1339 shutdownAndVerify(); 1340 } 1341 1342 @Test unaryHeadersShouldNotBeFlushed()1343 public void unaryHeadersShouldNotBeFlushed() throws Exception { 1344 // By default the method is a Unary call 1345 shouldHeadersBeFlushed(false); 1346 shutdownAndVerify(); 1347 } 1348 1349 @Test serverStreamingHeadersShouldNotBeFlushed()1350 public void serverStreamingHeadersShouldNotBeFlushed() throws Exception { 1351 method = method.toBuilder().setType(MethodType.SERVER_STREAMING).build(); 1352 shouldHeadersBeFlushed(false); 1353 shutdownAndVerify(); 1354 } 1355 1356 @Test clientStreamingHeadersShouldBeFlushed()1357 public void clientStreamingHeadersShouldBeFlushed() throws Exception { 1358 method = method.toBuilder().setType(MethodType.CLIENT_STREAMING).build(); 1359 shouldHeadersBeFlushed(true); 1360 shutdownAndVerify(); 1361 } 1362 1363 @Test duplexStreamingHeadersShouldNotBeFlushed()1364 public void duplexStreamingHeadersShouldNotBeFlushed() throws Exception { 1365 method = method.toBuilder().setType(MethodType.BIDI_STREAMING).build(); 1366 shouldHeadersBeFlushed(true); 1367 shutdownAndVerify(); 1368 } 1369 shouldHeadersBeFlushed(boolean shouldBeFlushed)1370 private void shouldHeadersBeFlushed(boolean shouldBeFlushed) throws Exception { 1371 initTransport(); 1372 MockStreamListener listener = new MockStreamListener(); 1373 OkHttpClientStream stream = 1374 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1375 stream.start(listener); 1376 verify(frameWriter, timeout(TIME_OUT_MS)).synStream( 1377 eq(false), eq(false), eq(3), eq(0), ArgumentMatchers.<Header>anyList()); 1378 if (shouldBeFlushed) { 1379 verify(frameWriter, timeout(TIME_OUT_MS)).flush(); 1380 } else { 1381 verify(frameWriter, timeout(TIME_OUT_MS).times(0)).flush(); 1382 } 1383 stream.cancel(Status.CANCELLED); 1384 } 1385 1386 @Test receiveDataWithoutHeader()1387 public void receiveDataWithoutHeader() throws Exception { 1388 initTransport(); 1389 MockStreamListener listener = new MockStreamListener(); 1390 OkHttpClientStream stream = 1391 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1392 stream.start(listener); 1393 stream.request(1); 1394 Buffer buffer = createMessageFrame(new byte[1]); 1395 frameHandler().data(false, 3, buffer, (int) buffer.size()); 1396 1397 // Trigger the failure by a trailer. 1398 frameHandler().headers( 1399 true, true, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 1400 1401 listener.waitUntilStreamClosed(); 1402 assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); 1403 assertTrue(listener.status.getDescription().startsWith("headers not received before payload")); 1404 assertEquals(0, listener.messages.size()); 1405 shutdownAndVerify(); 1406 } 1407 1408 @Test receiveDataWithoutHeaderAndTrailer()1409 public void receiveDataWithoutHeaderAndTrailer() throws Exception { 1410 initTransport(); 1411 MockStreamListener listener = new MockStreamListener(); 1412 OkHttpClientStream stream = 1413 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1414 stream.start(listener); 1415 stream.request(1); 1416 Buffer buffer = createMessageFrame(new byte[1]); 1417 frameHandler().data(false, 3, buffer, (int) buffer.size()); 1418 1419 // Trigger the failure by a data frame. 1420 buffer = createMessageFrame(new byte[1]); 1421 frameHandler().data(true, 3, buffer, (int) buffer.size()); 1422 1423 listener.waitUntilStreamClosed(); 1424 assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); 1425 assertTrue(listener.status.getDescription().startsWith("headers not received before payload")); 1426 assertEquals(0, listener.messages.size()); 1427 shutdownAndVerify(); 1428 } 1429 1430 @Test receiveLongEnoughDataWithoutHeaderAndTrailer()1431 public void receiveLongEnoughDataWithoutHeaderAndTrailer() throws Exception { 1432 initTransport(); 1433 MockStreamListener listener = new MockStreamListener(); 1434 OkHttpClientStream stream = 1435 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1436 stream.start(listener); 1437 stream.request(1); 1438 Buffer buffer = createMessageFrame(new byte[1000]); 1439 frameHandler().data(false, 3, buffer, (int) buffer.size()); 1440 1441 // Once we receive enough detail, we cancel the stream. so we should have sent cancel. 1442 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); 1443 1444 listener.waitUntilStreamClosed(); 1445 assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); 1446 assertTrue(listener.status.getDescription().startsWith("headers not received before payload")); 1447 assertEquals(0, listener.messages.size()); 1448 shutdownAndVerify(); 1449 } 1450 1451 @Test receiveDataForUnknownStreamUpdateConnectionWindow()1452 public void receiveDataForUnknownStreamUpdateConnectionWindow() throws Exception { 1453 initTransport(); 1454 MockStreamListener listener = new MockStreamListener(); 1455 OkHttpClientStream stream = 1456 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1457 stream.start(listener); 1458 stream.cancel(Status.CANCELLED); 1459 1460 Buffer buffer = createMessageFrame( 1461 new byte[INITIAL_WINDOW_SIZE / 2 + 1]); 1462 frameHandler().data(false, 3, buffer, (int) buffer.size()); 1463 // Should still update the connection window even stream 3 is gone. 1464 verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0, 1465 HEADER_LENGTH + INITIAL_WINDOW_SIZE / 2 + 1); 1466 buffer = createMessageFrame( 1467 new byte[INITIAL_WINDOW_SIZE / 2 + 1]); 1468 1469 // This should kill the connection, since we never created stream 5. 1470 frameHandler().data(false, 5, buffer, (int) buffer.size()); 1471 verify(frameWriter, timeout(TIME_OUT_MS)) 1472 .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class)); 1473 verify(transportListener).transportShutdown(isA(Status.class)); 1474 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1475 shutdownAndVerify(); 1476 } 1477 1478 @Test receiveWindowUpdateForUnknownStream()1479 public void receiveWindowUpdateForUnknownStream() throws Exception { 1480 initTransport(); 1481 MockStreamListener listener = new MockStreamListener(); 1482 OkHttpClientStream stream = 1483 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1484 stream.start(listener); 1485 stream.cancel(Status.CANCELLED); 1486 // This should be ignored. 1487 frameHandler().windowUpdate(3, 73); 1488 listener.waitUntilStreamClosed(); 1489 // This should kill the connection, since we never created stream 5. 1490 frameHandler().windowUpdate(5, 73); 1491 verify(frameWriter, timeout(TIME_OUT_MS)) 1492 .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class)); 1493 verify(transportListener).transportShutdown(isA(Status.class)); 1494 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1495 shutdownAndVerify(); 1496 } 1497 1498 @Test shouldBeInitiallyReady()1499 public void shouldBeInitiallyReady() throws Exception { 1500 initTransport(); 1501 MockStreamListener listener = new MockStreamListener(); 1502 OkHttpClientStream stream = 1503 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1504 stream.start(listener); 1505 assertTrue(stream.isReady()); 1506 assertTrue(listener.isOnReadyCalled()); 1507 stream.cancel(Status.CANCELLED); 1508 assertFalse(stream.isReady()); 1509 shutdownAndVerify(); 1510 } 1511 1512 @Test notifyOnReady()1513 public void notifyOnReady() throws Exception { 1514 initTransport(); 1515 // exactly one byte below the threshold 1516 int messageLength = 1517 AbstractStream.TransportState.DEFAULT_ONREADY_THRESHOLD - HEADER_LENGTH - 1; 1518 setInitialWindowSize(0); 1519 MockStreamListener listener = new MockStreamListener(); 1520 OkHttpClientStream stream = 1521 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1522 stream.start(listener); 1523 assertTrue(stream.isReady()); 1524 // Be notified at the beginning. 1525 assertTrue(listener.isOnReadyCalled()); 1526 1527 // Write a message that will not exceed the notification threshold and queue it. 1528 InputStream input = new ByteArrayInputStream(new byte[messageLength]); 1529 stream.writeMessage(input); 1530 stream.flush(); 1531 assertTrue(stream.isReady()); 1532 1533 // Write another two messages, still be queued. 1534 input = new ByteArrayInputStream(new byte[messageLength]); 1535 stream.writeMessage(input); 1536 stream.flush(); 1537 assertFalse(stream.isReady()); 1538 input = new ByteArrayInputStream(new byte[messageLength]); 1539 stream.writeMessage(input); 1540 stream.flush(); 1541 assertFalse(stream.isReady()); 1542 1543 // Let the first message out. 1544 frameHandler().windowUpdate(0, HEADER_LENGTH + messageLength); 1545 frameHandler().windowUpdate(3, HEADER_LENGTH + messageLength); 1546 assertFalse(stream.isReady()); 1547 assertFalse(listener.isOnReadyCalled()); 1548 1549 // Let the second message out. 1550 frameHandler().windowUpdate(0, HEADER_LENGTH + messageLength); 1551 frameHandler().windowUpdate(3, HEADER_LENGTH + messageLength); 1552 assertTrue(stream.isReady()); 1553 assertTrue(listener.isOnReadyCalled()); 1554 1555 stream.cancel(Status.CANCELLED); 1556 shutdownAndVerify(); 1557 } 1558 1559 @Test transportReady()1560 public void transportReady() throws Exception { 1561 initTransport(); 1562 verifyNoInteractions(transportListener); 1563 frameHandler().settings(false, new Settings()); 1564 verify(transportListener).transportReady(); 1565 shutdownAndVerify(); 1566 } 1567 1568 @Test ping()1569 public void ping() throws Exception { 1570 initTransport(); 1571 PingCallbackImpl callback1 = new PingCallbackImpl(); 1572 clientTransport.ping(callback1, MoreExecutors.directExecutor()); 1573 assertEquals(1, getTransportStats(clientTransport).keepAlivesSent); 1574 // add'l ping will be added as listener to outstanding operation 1575 PingCallbackImpl callback2 = new PingCallbackImpl(); 1576 clientTransport.ping(callback2, MoreExecutors.directExecutor()); 1577 assertEquals(1, getTransportStats(clientTransport).keepAlivesSent); 1578 1579 ArgumentCaptor<Integer> captor1 = ArgumentCaptor.forClass(int.class); 1580 ArgumentCaptor<Integer> captor2 = ArgumentCaptor.forClass(int.class); 1581 verify(frameWriter, timeout(TIME_OUT_MS)).ping(eq(false), captor1.capture(), captor2.capture()); 1582 // callback not invoked until we see acknowledgement 1583 assertEquals(0, callback1.invocationCount); 1584 assertEquals(0, callback2.invocationCount); 1585 1586 int payload1 = captor1.getValue(); 1587 int payload2 = captor2.getValue(); 1588 // getting a bad ack won't complete the future 1589 // to make the ack "bad", we modify the payload so it doesn't match 1590 frameHandler().ping(true, payload1, payload2 - 1); 1591 // operation not complete because ack was wrong 1592 assertEquals(0, callback1.invocationCount); 1593 assertEquals(0, callback2.invocationCount); 1594 1595 nanoTime += 10101; 1596 1597 // reading the proper response should complete the future 1598 frameHandler().ping(true, payload1, payload2); 1599 assertEquals(1, callback1.invocationCount); 1600 assertEquals(10101, callback1.roundTripTime); 1601 assertNull(callback1.failureCause); 1602 // callback2 piggy-backed on same operation 1603 assertEquals(1, callback2.invocationCount); 1604 assertEquals(10101, callback2.roundTripTime); 1605 assertNull(callback2.failureCause); 1606 1607 // now that previous ping is done, next request returns a different future 1608 callback1 = new PingCallbackImpl(); 1609 clientTransport.ping(callback1, MoreExecutors.directExecutor()); 1610 assertEquals(2, getTransportStats(clientTransport).keepAlivesSent); 1611 assertEquals(0, callback1.invocationCount); 1612 shutdownAndVerify(); 1613 } 1614 1615 @Test ping_failsWhenTransportShutdown()1616 public void ping_failsWhenTransportShutdown() throws Exception { 1617 initTransport(); 1618 PingCallbackImpl callback = new PingCallbackImpl(); 1619 clientTransport.ping(callback, MoreExecutors.directExecutor()); 1620 assertEquals(1, getTransportStats(clientTransport).keepAlivesSent); 1621 assertEquals(0, callback.invocationCount); 1622 1623 clientTransport.shutdown(SHUTDOWN_REASON); 1624 // ping failed on channel shutdown 1625 assertEquals(1, callback.invocationCount); 1626 assertTrue(callback.failureCause instanceof StatusException); 1627 assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus()); 1628 1629 // now that handler is in terminal state, all future pings fail immediately 1630 callback = new PingCallbackImpl(); 1631 clientTransport.ping(callback, MoreExecutors.directExecutor()); 1632 assertEquals(1, getTransportStats(clientTransport).keepAlivesSent); 1633 assertEquals(1, callback.invocationCount); 1634 assertTrue(callback.failureCause instanceof StatusException); 1635 assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus()); 1636 shutdownAndVerify(); 1637 } 1638 1639 @Test ping_failsIfTransportFails()1640 public void ping_failsIfTransportFails() throws Exception { 1641 initTransport(); 1642 PingCallbackImpl callback = new PingCallbackImpl(); 1643 clientTransport.ping(callback, MoreExecutors.directExecutor()); 1644 assertEquals(1, getTransportStats(clientTransport).keepAlivesSent); 1645 assertEquals(0, callback.invocationCount); 1646 1647 clientTransport.onException(new IOException()); 1648 // ping failed on error 1649 assertEquals(1, callback.invocationCount); 1650 assertTrue(callback.failureCause instanceof StatusException); 1651 assertEquals(Status.Code.UNAVAILABLE, 1652 ((StatusException) callback.failureCause).getStatus().getCode()); 1653 1654 // now that handler is in terminal state, all future pings fail immediately 1655 callback = new PingCallbackImpl(); 1656 clientTransport.ping(callback, MoreExecutors.directExecutor()); 1657 assertEquals(1, getTransportStats(clientTransport).keepAlivesSent); 1658 assertEquals(1, callback.invocationCount); 1659 assertTrue(callback.failureCause instanceof StatusException); 1660 assertEquals(Status.Code.UNAVAILABLE, 1661 ((StatusException) callback.failureCause).getStatus().getCode()); 1662 shutdownAndVerify(); 1663 } 1664 1665 @Test shutdownDuringConnecting()1666 public void shutdownDuringConnecting() throws Exception { 1667 SettableFuture<Void> delayed = SettableFuture.create(); 1668 Runnable connectingCallback = () -> Futures.getUnchecked(delayed); 1669 startTransport( 1670 DEFAULT_START_STREAM_ID, 1671 connectingCallback, 1672 false, 1673 null); 1674 clientTransport.shutdown(SHUTDOWN_REASON); 1675 delayed.set(null); 1676 shutdownAndVerify(); 1677 } 1678 1679 @Test invalidAuthorityPropagates()1680 public void invalidAuthorityPropagates() { 1681 clientTransport = new OkHttpClientTransport( 1682 channelBuilder.buildTransportFactory(), 1683 new InetSocketAddress("localhost", 1234), 1684 "invalid_authority", 1685 "userAgent", 1686 EAG_ATTRS, 1687 NO_PROXY, 1688 tooManyPingsRunnable); 1689 1690 String host = clientTransport.getOverridenHost(); 1691 int port = clientTransport.getOverridenPort(); 1692 1693 assertEquals("invalid_authority", host); 1694 assertEquals(1234, port); 1695 } 1696 1697 @Test unreachableServer()1698 public void unreachableServer() throws Exception { 1699 clientTransport = new OkHttpClientTransport( 1700 channelBuilder.buildTransportFactory(), 1701 new InetSocketAddress("localhost", 0), 1702 "authority", 1703 "userAgent", 1704 EAG_ATTRS, 1705 NO_PROXY, 1706 tooManyPingsRunnable); 1707 1708 ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); 1709 clientTransport.start(listener); 1710 ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); 1711 verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture()); 1712 Status status = captor.getValue(); 1713 assertEquals(Status.UNAVAILABLE.getCode(), status.getCode()); 1714 assertTrue(status.getCause().toString(), status.getCause() instanceof IOException); 1715 1716 MockStreamListener streamListener = new MockStreamListener(); 1717 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers) 1718 .start(streamListener); 1719 streamListener.waitUntilStreamClosed(); 1720 assertEquals(Status.UNAVAILABLE.getCode(), streamListener.status.getCode()); 1721 } 1722 1723 @Test customSocketFactory()1724 public void customSocketFactory() throws Exception { 1725 RuntimeException exception = new RuntimeException("thrown by socket factory"); 1726 SocketFactory socketFactory = new RuntimeExceptionThrowingSocketFactory(exception); 1727 1728 clientTransport = 1729 new OkHttpClientTransport( 1730 channelBuilder.socketFactory(socketFactory).buildTransportFactory(), 1731 new InetSocketAddress("localhost", 0), 1732 "authority", 1733 "userAgent", 1734 EAG_ATTRS, 1735 NO_PROXY, 1736 tooManyPingsRunnable); 1737 1738 ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); 1739 clientTransport.start(listener); 1740 ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); 1741 verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture()); 1742 Status status = captor.getValue(); 1743 assertEquals(Status.UNAVAILABLE.getCode(), status.getCode()); 1744 assertSame(exception, status.getCause()); 1745 } 1746 1747 @Test proxy_200()1748 public void proxy_200() throws Exception { 1749 ServerSocket serverSocket = new ServerSocket(0); 1750 InetSocketAddress targetAddress = InetSocketAddress.createUnresolved("theservice", 80); 1751 clientTransport = new OkHttpClientTransport( 1752 channelBuilder.buildTransportFactory(), 1753 targetAddress, 1754 "authority", 1755 "userAgent", 1756 EAG_ATTRS, 1757 HttpConnectProxiedSocketAddress.newBuilder() 1758 .setTargetAddress(targetAddress) 1759 .setProxyAddress(new InetSocketAddress("localhost", serverSocket.getLocalPort())) 1760 .build(), 1761 tooManyPingsRunnable); 1762 clientTransport.start(transportListener); 1763 1764 Socket sock = serverSocket.accept(); 1765 serverSocket.close(); 1766 1767 BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), UTF_8)); 1768 assertEquals("CONNECT theservice:80 HTTP/1.1", reader.readLine()); 1769 assertEquals("Host: theservice:80", reader.readLine()); 1770 while (!"".equals(reader.readLine())) {} 1771 1772 sock.getOutputStream().write("HTTP/1.1 200 OK\r\nServer: test\r\n\r\n".getBytes(UTF_8)); 1773 sock.getOutputStream().flush(); 1774 1775 assertEquals("PRI * HTTP/2.0", reader.readLine()); 1776 assertEquals("", reader.readLine()); 1777 assertEquals("SM", reader.readLine()); 1778 assertEquals("", reader.readLine()); 1779 1780 // Empty SETTINGS 1781 sock.getOutputStream().write(new byte[] {0, 0, 0, 0, 0x4, 0}); 1782 // GOAWAY 1783 sock.getOutputStream().write(new byte[] { 1784 0, 0, 0, 8, 0x7, 0, 1785 0, 0, 0, 0, // last stream id 1786 0, 0, 0, 0, // error code 1787 }); 1788 sock.getOutputStream().flush(); 1789 1790 verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class)); 1791 while (sock.getInputStream().read() != -1) {} 1792 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1793 sock.close(); 1794 } 1795 1796 @Test proxy_500()1797 public void proxy_500() throws Exception { 1798 ServerSocket serverSocket = new ServerSocket(0); 1799 InetSocketAddress targetAddress = InetSocketAddress.createUnresolved("theservice", 80); 1800 clientTransport = new OkHttpClientTransport( 1801 channelBuilder.buildTransportFactory(), 1802 targetAddress, 1803 "authority", 1804 "userAgent", 1805 EAG_ATTRS, 1806 HttpConnectProxiedSocketAddress.newBuilder() 1807 .setTargetAddress(targetAddress) 1808 .setProxyAddress(new InetSocketAddress("localhost", serverSocket.getLocalPort())) 1809 .build(), 1810 tooManyPingsRunnable); 1811 clientTransport.start(transportListener); 1812 1813 Socket sock = serverSocket.accept(); 1814 serverSocket.close(); 1815 1816 BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), UTF_8)); 1817 assertEquals("CONNECT theservice:80 HTTP/1.1", reader.readLine()); 1818 assertEquals("Host: theservice:80", reader.readLine()); 1819 while (!"".equals(reader.readLine())) {} 1820 1821 final String errorText = "text describing error"; 1822 sock.getOutputStream().write("HTTP/1.1 500 OH NO\r\n\r\n".getBytes(UTF_8)); 1823 sock.getOutputStream().write(errorText.getBytes(UTF_8)); 1824 sock.getOutputStream().flush(); 1825 sock.shutdownOutput(); 1826 1827 assertEquals(-1, sock.getInputStream().read()); 1828 1829 ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); 1830 verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture()); 1831 Status error = captor.getValue(); 1832 assertTrue("Status didn't contain error code: " + captor.getValue(), 1833 error.getDescription().contains("500")); 1834 assertTrue("Status didn't contain error description: " + captor.getValue(), 1835 error.getDescription().contains("OH NO")); 1836 assertTrue("Status didn't contain error text: " + captor.getValue(), 1837 error.getDescription().contains(errorText)); 1838 assertEquals("Not UNAVAILABLE: " + captor.getValue(), 1839 Status.UNAVAILABLE.getCode(), error.getCode()); 1840 sock.close(); 1841 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1842 } 1843 1844 @Test proxy_immediateServerClose()1845 public void proxy_immediateServerClose() throws Exception { 1846 ServerSocket serverSocket = new ServerSocket(0); 1847 InetSocketAddress targetAddress = InetSocketAddress.createUnresolved("theservice", 80); 1848 clientTransport = new OkHttpClientTransport( 1849 channelBuilder.buildTransportFactory(), 1850 targetAddress, 1851 "authority", 1852 "userAgent", 1853 EAG_ATTRS, 1854 HttpConnectProxiedSocketAddress.newBuilder() 1855 .setTargetAddress(targetAddress) 1856 .setProxyAddress(new InetSocketAddress("localhost", serverSocket.getLocalPort())) 1857 .build(), 1858 tooManyPingsRunnable); 1859 clientTransport.start(transportListener); 1860 1861 Socket sock = serverSocket.accept(); 1862 serverSocket.close(); 1863 sock.close(); 1864 1865 ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); 1866 verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture()); 1867 Status error = captor.getValue(); 1868 assertTrue("Status didn't contain proxy: " + captor.getValue(), 1869 error.getDescription().contains("proxy")); 1870 assertEquals("Not UNAVAILABLE: " + captor.getValue(), 1871 Status.UNAVAILABLE.getCode(), error.getCode()); 1872 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1873 } 1874 1875 @Test proxy_serverHangs()1876 public void proxy_serverHangs() throws Exception { 1877 ServerSocket serverSocket = new ServerSocket(0); 1878 InetSocketAddress targetAddress = InetSocketAddress.createUnresolved("theservice", 80); 1879 clientTransport = new OkHttpClientTransport( 1880 channelBuilder.buildTransportFactory(), 1881 targetAddress, 1882 "authority", 1883 "userAgent", 1884 EAG_ATTRS, 1885 HttpConnectProxiedSocketAddress.newBuilder() 1886 .setTargetAddress(targetAddress) 1887 .setProxyAddress(new InetSocketAddress("localhost", serverSocket.getLocalPort())) 1888 .build(), 1889 tooManyPingsRunnable); 1890 clientTransport.proxySocketTimeout = 10; 1891 clientTransport.start(transportListener); 1892 1893 Socket sock = serverSocket.accept(); 1894 serverSocket.close(); 1895 1896 BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), UTF_8)); 1897 assertEquals("CONNECT theservice:80 HTTP/1.1", reader.readLine()); 1898 assertEquals("Host: theservice:80", reader.readLine()); 1899 while (!"".equals(reader.readLine())) {} 1900 1901 verify(transportListener, timeout(200)).transportShutdown(any(Status.class)); 1902 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1903 sock.close(); 1904 } 1905 1906 @Test goAway_notUtf8()1907 public void goAway_notUtf8() throws Exception { 1908 initTransport(); 1909 // 0xFF is never permitted in UTF-8. 0xF0 should have 3 continuations following, and 0x0a isn't 1910 // a continuation. 1911 frameHandler().goAway( 1912 0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.of((byte) 0xFF, (byte) 0xF0, (byte) 0x0a)); 1913 1914 shutdownAndVerify(); 1915 } 1916 1917 @Test goAway_notTooManyPings()1918 public void goAway_notTooManyPings() throws Exception { 1919 final AtomicBoolean run = new AtomicBoolean(); 1920 tooManyPingsRunnable = new Runnable() { 1921 @Override 1922 public void run() { 1923 run.set(true); 1924 } 1925 }; 1926 initTransport(); 1927 frameHandler().goAway(0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.encodeUtf8("not_many_pings")); 1928 assertFalse(run.get()); 1929 1930 shutdownAndVerify(); 1931 } 1932 1933 @Test goAway_tooManyPings()1934 public void goAway_tooManyPings() throws Exception { 1935 final AtomicBoolean run = new AtomicBoolean(); 1936 tooManyPingsRunnable = new Runnable() { 1937 @Override 1938 public void run() { 1939 run.set(true); 1940 } 1941 }; 1942 initTransport(); 1943 frameHandler().goAway(0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.encodeUtf8("too_many_pings")); 1944 assertTrue(run.get()); 1945 1946 shutdownAndVerify(); 1947 } 1948 1949 @Test goAway_streamListenerRpcProgress()1950 public void goAway_streamListenerRpcProgress() throws Exception { 1951 initTransport(); 1952 setMaxConcurrentStreams(2); 1953 MockStreamListener listener1 = new MockStreamListener(); 1954 MockStreamListener listener2 = new MockStreamListener(); 1955 MockStreamListener listener3 = new MockStreamListener(); 1956 OkHttpClientStream stream1 = 1957 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1958 stream1.start(listener1); 1959 OkHttpClientStream stream2 = 1960 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1961 stream2.start(listener2); 1962 OkHttpClientStream stream3 = 1963 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1964 stream3.start(listener3); 1965 waitForStreamPending(1); 1966 1967 assertEquals(2, activeStreamCount()); 1968 assertContainStream(DEFAULT_START_STREAM_ID); 1969 assertContainStream(DEFAULT_START_STREAM_ID + 2); 1970 1971 frameHandler() 1972 .goAway(DEFAULT_START_STREAM_ID, ErrorCode.CANCEL, ByteString.encodeUtf8("blablabla")); 1973 1974 listener2.waitUntilStreamClosed(); 1975 listener3.waitUntilStreamClosed(); 1976 assertNull(listener1.rpcProgress); 1977 assertEquals(REFUSED, listener2.rpcProgress); 1978 assertEquals(MISCARRIED, listener3.rpcProgress); 1979 assertEquals(1, activeStreamCount()); 1980 assertContainStream(DEFAULT_START_STREAM_ID); 1981 1982 getStream(DEFAULT_START_STREAM_ID).cancel(Status.CANCELLED); 1983 1984 listener1.waitUntilStreamClosed(); 1985 assertEquals(PROCESSED, listener1.rpcProgress); 1986 1987 shutdownAndVerify(); 1988 } 1989 1990 @Test reset_streamListenerRpcProgress()1991 public void reset_streamListenerRpcProgress() throws Exception { 1992 initTransport(); 1993 MockStreamListener listener1 = new MockStreamListener(); 1994 MockStreamListener listener2 = new MockStreamListener(); 1995 MockStreamListener listener3 = new MockStreamListener(); 1996 OkHttpClientStream stream1 = 1997 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 1998 stream1.start(listener1); 1999 OkHttpClientStream stream2 = 2000 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 2001 stream2.start(listener2); 2002 OkHttpClientStream stream3 = 2003 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 2004 stream3.start(listener3); 2005 2006 assertEquals(3, activeStreamCount()); 2007 assertContainStream(DEFAULT_START_STREAM_ID); 2008 assertContainStream(DEFAULT_START_STREAM_ID + 2); 2009 assertContainStream(DEFAULT_START_STREAM_ID + 4); 2010 2011 frameHandler().rstStream(DEFAULT_START_STREAM_ID + 2, ErrorCode.REFUSED_STREAM); 2012 2013 listener2.waitUntilStreamClosed(); 2014 assertNull(listener1.rpcProgress); 2015 assertEquals(REFUSED, listener2.rpcProgress); 2016 assertNull(listener3.rpcProgress); 2017 2018 frameHandler().rstStream(DEFAULT_START_STREAM_ID, ErrorCode.CANCEL); 2019 listener1.waitUntilStreamClosed(); 2020 assertEquals(PROCESSED, listener1.rpcProgress); 2021 assertNull(listener3.rpcProgress); 2022 2023 getStream(DEFAULT_START_STREAM_ID + 4).cancel(Status.CANCELLED); 2024 2025 listener3.waitUntilStreamClosed(); 2026 assertEquals(PROCESSED, listener3.rpcProgress); 2027 2028 shutdownAndVerify(); 2029 } 2030 2031 @Test shutdownNow_streamListenerRpcProgress()2032 public void shutdownNow_streamListenerRpcProgress() throws Exception { 2033 initTransport(); 2034 setMaxConcurrentStreams(2); 2035 MockStreamListener listener1 = new MockStreamListener(); 2036 MockStreamListener listener2 = new MockStreamListener(); 2037 MockStreamListener listener3 = new MockStreamListener(); 2038 OkHttpClientStream stream1 = 2039 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 2040 stream1.start(listener1); 2041 OkHttpClientStream stream2 = 2042 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 2043 stream2.start(listener2); 2044 OkHttpClientStream stream3 = 2045 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 2046 stream3.start(listener3); 2047 waitForStreamPending(1); 2048 2049 assertEquals(2, activeStreamCount()); 2050 assertContainStream(DEFAULT_START_STREAM_ID); 2051 assertContainStream(DEFAULT_START_STREAM_ID + 2); 2052 2053 clientTransport.shutdownNow(Status.INTERNAL); 2054 2055 listener1.waitUntilStreamClosed(); 2056 listener2.waitUntilStreamClosed(); 2057 listener3.waitUntilStreamClosed(); 2058 2059 assertEquals(PROCESSED, listener1.rpcProgress); 2060 assertEquals(PROCESSED, listener2.rpcProgress); 2061 assertEquals(MISCARRIED, listener3.rpcProgress); 2062 } 2063 activeStreamCount()2064 private int activeStreamCount() { 2065 return clientTransport.getActiveStreams().length; 2066 } 2067 getStream(int streamId)2068 private OkHttpClientStream getStream(int streamId) { 2069 return clientTransport.getStream(streamId); 2070 } 2071 assertContainStream(int streamId)2072 void assertContainStream(int streamId) { 2073 assertNotNull(clientTransport.getStream(streamId)); 2074 } 2075 frameHandler()2076 private ClientFrameHandler frameHandler() throws Exception { 2077 return clientTransport.getHandler(); 2078 } 2079 waitForStreamPending(int expected)2080 private void waitForStreamPending(int expected) throws Exception { 2081 int duration = TIME_OUT_MS / 10; 2082 for (int i = 0; i < 10; i++) { 2083 if (clientTransport.getPendingStreamSize() == expected) { 2084 return; 2085 } 2086 Thread.sleep(duration); 2087 } 2088 assertEquals(expected, clientTransport.getPendingStreamSize()); 2089 } 2090 assertNewStreamFail()2091 private void assertNewStreamFail() throws Exception { 2092 MockStreamListener listener = new MockStreamListener(); 2093 OkHttpClientStream stream = 2094 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); 2095 stream.start(listener); 2096 listener.waitUntilStreamClosed(); 2097 assertFalse(listener.status.isOk()); 2098 } 2099 setMaxConcurrentStreams(int num)2100 private void setMaxConcurrentStreams(int num) throws Exception { 2101 Settings settings = new Settings(); 2102 OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS, num); 2103 frameHandler().settings(false, settings); 2104 } 2105 setInitialWindowSize(int size)2106 private void setInitialWindowSize(int size) throws Exception { 2107 Settings settings = new Settings(); 2108 OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, size); 2109 frameHandler().settings(false, settings); 2110 } 2111 createMessageFrame(String message)2112 private static Buffer createMessageFrame(String message) { 2113 return createMessageFrame(message.getBytes(UTF_8)); 2114 } 2115 createMessageFrame(byte[] message)2116 private static Buffer createMessageFrame(byte[] message) { 2117 Buffer buffer = new Buffer(); 2118 buffer.writeByte(0 /* UNCOMPRESSED */); 2119 buffer.writeInt(message.length); 2120 buffer.write(message); 2121 return buffer; 2122 } 2123 grpcResponseHeaders()2124 private List<Header> grpcResponseHeaders() { 2125 return ImmutableList.of( 2126 new Header(":status", "200"), 2127 CONTENT_TYPE_HEADER); 2128 } 2129 grpcResponseTrailers()2130 private List<Header> grpcResponseTrailers() { 2131 return ImmutableList.of( 2132 new Header(InternalStatus.CODE_KEY.name(), "0"), 2133 // Adding Content-Type and :status for testing responses with only a single HEADERS frame. 2134 new Header(":status", "200"), 2135 CONTENT_TYPE_HEADER); 2136 } 2137 anyListHeader()2138 private static List<Header> anyListHeader() { 2139 return any(); 2140 } 2141 2142 private static class MockFrameReader implements FrameReader { 2143 final CountDownLatch closed = new CountDownLatch(1); 2144 2145 enum Result { 2146 THROW_EXCEPTION, 2147 RETURN_FALSE, 2148 THROW_ERROR 2149 } 2150 2151 final LinkedBlockingQueue<Result> nextResults = new LinkedBlockingQueue<>(); 2152 2153 @Override close()2154 public void close() throws IOException { 2155 closed.countDown(); 2156 } 2157 assertClosed()2158 void assertClosed() { 2159 try { 2160 if (!closed.await(TIME_OUT_MS, TimeUnit.MILLISECONDS)) { 2161 fail("Failed waiting frame reader to be closed."); 2162 } 2163 } catch (InterruptedException e) { 2164 Thread.currentThread().interrupt(); 2165 fail("Interrupted while waiting for frame reader to be closed."); 2166 } 2167 } 2168 2169 // The wait is safe; nextFrame is called in a loop and can have spurious wakeups 2170 @SuppressWarnings("WaitNotInLoop") 2171 @Override nextFrame(FrameReader.Handler handler)2172 public boolean nextFrame(FrameReader.Handler handler) throws IOException { 2173 Result result; 2174 try { 2175 result = nextResults.take(); 2176 } catch (InterruptedException e) { 2177 Thread.currentThread().interrupt(); 2178 throw new IOException(e); 2179 } 2180 switch (result) { 2181 case THROW_EXCEPTION: 2182 throw new IOException(NETWORK_ISSUE_MESSAGE); 2183 case RETURN_FALSE: 2184 return false; 2185 case THROW_ERROR: 2186 throw new Error(ERROR_MESSAGE); 2187 default: 2188 throw new UnsupportedOperationException("unimplemented: " + result); 2189 } 2190 } 2191 throwIoExceptionForNextFrame()2192 void throwIoExceptionForNextFrame() { 2193 nextResults.add(Result.THROW_EXCEPTION); 2194 } 2195 throwErrorForNextFrame()2196 void throwErrorForNextFrame() { 2197 nextResults.add(Result.THROW_ERROR); 2198 } 2199 nextFrameAtEndOfStream()2200 void nextFrameAtEndOfStream() { 2201 nextResults.add(Result.RETURN_FALSE); 2202 } 2203 2204 @Override readConnectionPreface()2205 public void readConnectionPreface() throws IOException { 2206 // not used. 2207 } 2208 } 2209 2210 private static class MockStreamListener implements ClientStreamListener { 2211 Status status; 2212 Metadata headers; 2213 Metadata trailers; 2214 RpcProgress rpcProgress; 2215 CountDownLatch closed = new CountDownLatch(1); 2216 ArrayList<String> messages = new ArrayList<>(); 2217 boolean onReadyCalled; 2218 MockStreamListener()2219 MockStreamListener() { 2220 } 2221 2222 @Override headersRead(Metadata headers)2223 public void headersRead(Metadata headers) { 2224 this.headers = headers; 2225 } 2226 2227 @Override messagesAvailable(MessageProducer producer)2228 public void messagesAvailable(MessageProducer producer) { 2229 InputStream inputStream; 2230 while ((inputStream = producer.next()) != null) { 2231 String msg = getContent(inputStream); 2232 if (msg != null) { 2233 messages.add(msg); 2234 } 2235 } 2236 } 2237 2238 @Override closed(Status status, RpcProgress rpcProgress, Metadata trailers)2239 public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { 2240 this.status = status; 2241 this.trailers = trailers; 2242 this.rpcProgress = rpcProgress; 2243 closed.countDown(); 2244 } 2245 2246 @Override onReady()2247 public void onReady() { 2248 onReadyCalled = true; 2249 } 2250 isOnReadyCalled()2251 boolean isOnReadyCalled() { 2252 boolean value = onReadyCalled; 2253 onReadyCalled = false; 2254 return value; 2255 } 2256 waitUntilStreamClosed()2257 void waitUntilStreamClosed() throws InterruptedException { 2258 if (!closed.await(TIME_OUT_MS, TimeUnit.MILLISECONDS)) { 2259 fail("Failed waiting stream to be closed."); 2260 } 2261 } 2262 2263 @SuppressWarnings("Finally") // We don't care about suppressed exceptions in the test getContent(InputStream message)2264 static String getContent(InputStream message) { 2265 BufferedReader br = new BufferedReader(new InputStreamReader(message, UTF_8)); 2266 try { 2267 // Only one line message is used in this test. 2268 return br.readLine(); 2269 } catch (IOException e) { 2270 return null; 2271 } finally { 2272 try { 2273 message.close(); 2274 } catch (IOException e) { 2275 // Ignore 2276 } 2277 } 2278 } 2279 } 2280 2281 private static class MockSocket extends Socket { 2282 final MockFrameReader frameReader; 2283 private final PipedOutputStream outputStream = new PipedOutputStream(); 2284 private final PipedInputStream outputStreamSink = new PipedInputStream(); 2285 private final PipedOutputStream inputStreamSource = new PipedOutputStream(); 2286 private final PipedInputStream inputStream = new PipedInputStream(); 2287 MockSocket(MockFrameReader frameReader)2288 MockSocket(MockFrameReader frameReader) { 2289 this.frameReader = frameReader; 2290 try { 2291 outputStreamSink.connect(outputStream); 2292 inputStream.connect(inputStreamSource); 2293 } catch (IOException ex) { 2294 throw new AssertionError(ex); 2295 } 2296 } 2297 2298 @Override close()2299 public synchronized void close() { 2300 frameReader.nextFrameAtEndOfStream(); 2301 } 2302 2303 @Override getLocalSocketAddress()2304 public SocketAddress getLocalSocketAddress() { 2305 return InetSocketAddress.createUnresolved("localhost", 4000); 2306 } 2307 2308 @Override getOutputStream()2309 public OutputStream getOutputStream() { 2310 return outputStream; 2311 } 2312 2313 @Override getInputStream()2314 public InputStream getInputStream() { 2315 return inputStream; 2316 } 2317 } 2318 2319 static class PingCallbackImpl implements ClientTransport.PingCallback { 2320 int invocationCount; 2321 long roundTripTime; 2322 Throwable failureCause; 2323 2324 @Override onSuccess(long roundTripTimeNanos)2325 public void onSuccess(long roundTripTimeNanos) { 2326 invocationCount++; 2327 this.roundTripTime = roundTripTimeNanos; 2328 } 2329 2330 @Override onFailure(Throwable cause)2331 public void onFailure(Throwable cause) { 2332 invocationCount++; 2333 this.failureCause = cause; 2334 } 2335 } 2336 shutdownAndVerify()2337 private void shutdownAndVerify() { 2338 clientTransport.shutdown(SHUTDOWN_REASON); 2339 assertEquals(0, activeStreamCount()); 2340 try { 2341 verify(frameWriter, timeout(TIME_OUT_MS)).close(); 2342 } catch (IOException e) { 2343 throw new RuntimeException(e); 2344 } 2345 frameReader.assertClosed(); 2346 } 2347 getTransportStats(InternalInstrumented<SocketStats> obj)2348 private static TransportStats getTransportStats(InternalInstrumented<SocketStats> obj) 2349 throws ExecutionException, InterruptedException { 2350 return obj.getStats().get().data; 2351 } 2352 2353 /** A FrameWriter to mock with CALL_REAL_METHODS option. */ 2354 private static class MockFrameWriter implements FrameWriter { 2355 2356 private Socket socket; 2357 private final Queue<Buffer> capturedBuffer; 2358 MockFrameWriter(Socket socket, Queue<Buffer> capturedBuffer)2359 public MockFrameWriter(Socket socket, Queue<Buffer> capturedBuffer) { 2360 // Sets a socket to close. Some tests assumes that FrameWriter will close underlying sink 2361 // which will eventually close the socket. 2362 this.socket = socket; 2363 this.capturedBuffer = capturedBuffer; 2364 } 2365 2366 @Override close()2367 public void close() throws IOException { 2368 socket.close(); 2369 } 2370 2371 @Override maxDataLength()2372 public int maxDataLength() { 2373 return Integer.MAX_VALUE; 2374 } 2375 2376 @Override data(boolean outFinished, int streamId, Buffer source, int byteCount)2377 public void data(boolean outFinished, int streamId, Buffer source, int byteCount) 2378 throws IOException { 2379 // simulate the side effect, and captures to internal queue. 2380 Buffer capture = new Buffer(); 2381 capture.write(source, byteCount); 2382 capturedBuffer.add(capture); 2383 } 2384 2385 // rest of methods are unimplemented 2386 2387 @Override connectionPreface()2388 public void connectionPreface() throws IOException {} 2389 2390 @Override ackSettings(Settings peerSettings)2391 public void ackSettings(Settings peerSettings) throws IOException {} 2392 2393 @Override pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)2394 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) 2395 throws IOException {} 2396 2397 @Override flush()2398 public void flush() throws IOException {} 2399 2400 @Override synStream(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock)2401 public void synStream(boolean outFinished, boolean inFinished, int streamId, 2402 int associatedStreamId, List<Header> headerBlock) throws IOException {} 2403 2404 @Override synReply(boolean outFinished, int streamId, List<Header> headerBlock)2405 public void synReply(boolean outFinished, int streamId, List<Header> headerBlock) 2406 throws IOException {} 2407 2408 @Override headers(int streamId, List<Header> headerBlock)2409 public void headers(int streamId, List<Header> headerBlock) throws IOException {} 2410 2411 @Override rstStream(int streamId, ErrorCode errorCode)2412 public void rstStream(int streamId, ErrorCode errorCode) throws IOException {} 2413 2414 @Override settings(Settings okHttpSettings)2415 public void settings(Settings okHttpSettings) throws IOException {} 2416 2417 @Override ping(boolean ack, int payload1, int payload2)2418 public void ping(boolean ack, int payload1, int payload2) throws IOException {} 2419 2420 @Override goAway(int lastGoodStreamId, ErrorCode errorCode, byte[] debugData)2421 public void goAway(int lastGoodStreamId, ErrorCode errorCode, byte[] debugData) 2422 throws IOException {} 2423 2424 @Override windowUpdate(int streamId, long windowSizeIncrement)2425 public void windowUpdate(int streamId, long windowSizeIncrement) throws IOException {} 2426 } 2427 2428 private static class RuntimeExceptionThrowingSocketFactory extends SocketFactory { 2429 RuntimeException exception; 2430 RuntimeExceptionThrowingSocketFactory(RuntimeException exception)2431 private RuntimeExceptionThrowingSocketFactory(RuntimeException exception) { 2432 this.exception = exception; 2433 } 2434 2435 @Override createSocket(String s, int i)2436 public Socket createSocket(String s, int i) { 2437 throw exception; 2438 } 2439 2440 @Override createSocket(String s, int i, InetAddress inetAddress, int i1)2441 public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) { 2442 throw exception; 2443 } 2444 2445 @Override createSocket(InetAddress inetAddress, int i)2446 public Socket createSocket(InetAddress inetAddress, int i) { 2447 throw exception; 2448 } 2449 2450 @Override createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1)2451 public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) { 2452 throw exception; 2453 } 2454 } 2455 2456 static class FakeSocketFactory extends SocketFactory { 2457 private Socket socket; 2458 FakeSocketFactory(Socket socket)2459 public FakeSocketFactory(Socket socket) { 2460 this.socket = Preconditions.checkNotNull(socket, "socket"); 2461 } 2462 createSocket()2463 @Override public Socket createSocket() { 2464 Preconditions.checkNotNull(this.socket, "socket"); 2465 Socket socket = this.socket; 2466 this.socket = null; 2467 return socket; 2468 } 2469 createSocket(InetAddress host, int port)2470 @Override public Socket createSocket(InetAddress host, int port) { 2471 return createSocket(); 2472 } 2473 createSocket( InetAddress host, int port, InetAddress localAddress, int localPort)2474 @Override public Socket createSocket( 2475 InetAddress host, int port, InetAddress localAddress, int localPort) { 2476 return createSocket(); 2477 } 2478 createSocket(String host, int port)2479 @Override public Socket createSocket(String host, int port) { 2480 return createSocket(); 2481 } 2482 createSocket( String host, int port, InetAddress localHost, int localPort)2483 @Override public Socket createSocket( 2484 String host, int port, InetAddress localHost, int localPort) { 2485 return createSocket(); 2486 } 2487 } 2488 2489 static class FakeVariant implements Variant { 2490 private FrameReader frameReader; 2491 private FrameWriter frameWriter; 2492 FakeVariant(FrameReader frameReader, FrameWriter frameWriter)2493 public FakeVariant(FrameReader frameReader, FrameWriter frameWriter) { 2494 this.frameReader = frameReader; 2495 this.frameWriter = frameWriter; 2496 } 2497 getProtocol()2498 @Override public Protocol getProtocol() { 2499 return Protocol.HTTP_2; 2500 } 2501 newReader(BufferedSource source, boolean client)2502 @Override public FrameReader newReader(BufferedSource source, boolean client) { 2503 Preconditions.checkNotNull(this.frameReader, "frameReader"); 2504 FrameReader frameReader = this.frameReader; 2505 this.frameReader = null; 2506 return frameReader; 2507 } 2508 newWriter(BufferedSink sink, boolean client)2509 @Override public FrameWriter newWriter(BufferedSink sink, boolean client) { 2510 Preconditions.checkNotNull(this.frameWriter, "frameWriter"); 2511 FrameWriter frameWriter = this.frameWriter; 2512 this.frameWriter = null; 2513 return frameWriter; 2514 } 2515 } 2516 } 2517