xref: /aosp_15_r20/external/grpc-grpc-java/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.okhttp;
18 
19 import static com.google.common.base.Charsets.UTF_8;
20 import static com.google.common.truth.Truth.assertThat;
21 import static io.grpc.internal.ClientStreamListener.RpcProgress.MISCARRIED;
22 import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
23 import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED;
24 import static io.grpc.okhttp.Headers.CONTENT_TYPE_HEADER;
25 import static io.grpc.okhttp.Headers.HTTP_SCHEME_HEADER;
26 import static io.grpc.okhttp.Headers.METHOD_HEADER;
27 import static io.grpc.okhttp.Headers.TE_HEADER;
28 import static org.junit.Assert.assertEquals;
29 import static org.junit.Assert.assertFalse;
30 import static org.junit.Assert.assertNotNull;
31 import static org.junit.Assert.assertNull;
32 import static org.junit.Assert.assertSame;
33 import static org.junit.Assert.assertTrue;
34 import static org.junit.Assert.fail;
35 import static org.mockito.ArgumentMatchers.any;
36 import static org.mockito.ArgumentMatchers.anyBoolean;
37 import static org.mockito.ArgumentMatchers.anyInt;
38 import static org.mockito.ArgumentMatchers.eq;
39 import static org.mockito.ArgumentMatchers.isA;
40 import static org.mockito.ArgumentMatchers.same;
41 import static org.mockito.Mockito.inOrder;
42 import static org.mockito.Mockito.mock;
43 import static org.mockito.Mockito.never;
44 import static org.mockito.Mockito.reset;
45 import static org.mockito.Mockito.timeout;
46 import static org.mockito.Mockito.verify;
47 import static org.mockito.Mockito.verifyNoInteractions;
48 
49 import com.google.common.base.Preconditions;
50 import com.google.common.base.Stopwatch;
51 import com.google.common.base.Supplier;
52 import com.google.common.base.Ticker;
53 import com.google.common.collect.ImmutableList;
54 import com.google.common.util.concurrent.Futures;
55 import com.google.common.util.concurrent.MoreExecutors;
56 import com.google.common.util.concurrent.SettableFuture;
57 import io.grpc.Attributes;
58 import io.grpc.CallOptions;
59 import io.grpc.ClientStreamTracer;
60 import io.grpc.HttpConnectProxiedSocketAddress;
61 import io.grpc.InternalChannelz.SocketStats;
62 import io.grpc.InternalChannelz.TransportStats;
63 import io.grpc.InternalInstrumented;
64 import io.grpc.InternalStatus;
65 import io.grpc.Metadata;
66 import io.grpc.MethodDescriptor;
67 import io.grpc.MethodDescriptor.MethodType;
68 import io.grpc.Status;
69 import io.grpc.Status.Code;
70 import io.grpc.StatusException;
71 import io.grpc.internal.AbstractStream;
72 import io.grpc.internal.ClientStreamListener;
73 import io.grpc.internal.ClientTransport;
74 import io.grpc.internal.FakeClock;
75 import io.grpc.internal.GrpcUtil;
76 import io.grpc.internal.ManagedClientTransport;
77 import io.grpc.okhttp.OkHttpClientTransport.ClientFrameHandler;
78 import io.grpc.okhttp.OkHttpFrameLogger.Direction;
79 import io.grpc.okhttp.internal.Protocol;
80 import io.grpc.okhttp.internal.framed.ErrorCode;
81 import io.grpc.okhttp.internal.framed.FrameReader;
82 import io.grpc.okhttp.internal.framed.FrameWriter;
83 import io.grpc.okhttp.internal.framed.Header;
84 import io.grpc.okhttp.internal.framed.HeadersMode;
85 import io.grpc.okhttp.internal.framed.Settings;
86 import io.grpc.okhttp.internal.framed.Variant;
87 import io.grpc.testing.TestMethodDescriptors;
88 import java.io.BufferedReader;
89 import java.io.ByteArrayInputStream;
90 import java.io.IOException;
91 import java.io.InputStream;
92 import java.io.InputStreamReader;
93 import java.io.OutputStream;
94 import java.io.PipedInputStream;
95 import java.io.PipedOutputStream;
96 import java.net.InetAddress;
97 import java.net.InetSocketAddress;
98 import java.net.ServerSocket;
99 import java.net.Socket;
100 import java.net.SocketAddress;
101 import java.util.ArrayDeque;
102 import java.util.ArrayList;
103 import java.util.Arrays;
104 import java.util.List;
105 import java.util.Queue;
106 import java.util.concurrent.CountDownLatch;
107 import java.util.concurrent.ExecutionException;
108 import java.util.concurrent.ExecutorService;
109 import java.util.concurrent.Executors;
110 import java.util.concurrent.LinkedBlockingQueue;
111 import java.util.concurrent.TimeUnit;
112 import java.util.concurrent.atomic.AtomicBoolean;
113 import java.util.logging.Handler;
114 import java.util.logging.Level;
115 import java.util.logging.LogRecord;
116 import java.util.logging.Logger;
117 import javax.annotation.Nullable;
118 import javax.net.SocketFactory;
119 import okio.Buffer;
120 import okio.BufferedSink;
121 import okio.BufferedSource;
122 import okio.ByteString;
123 import org.junit.After;
124 import org.junit.Rule;
125 import org.junit.Test;
126 import org.junit.rules.Timeout;
127 import org.junit.runner.RunWith;
128 import org.junit.runners.JUnit4;
129 import org.mockito.AdditionalAnswers;
130 import org.mockito.ArgumentCaptor;
131 import org.mockito.ArgumentMatchers;
132 import org.mockito.InOrder;
133 import org.mockito.Mock;
134 import org.mockito.junit.MockitoJUnit;
135 import org.mockito.junit.MockitoRule;
136 
137 /**
138  * Tests for {@link OkHttpClientTransport}.
139  */
140 @RunWith(JUnit4.class)
141 public class OkHttpClientTransportTest {
142   private static final int TIME_OUT_MS = 2000;
143   private static final int INITIAL_WINDOW_SIZE = 65535;
144   private static final String NETWORK_ISSUE_MESSAGE = "network issue";
145   private static final String ERROR_MESSAGE = "simulated error";
146   // The gRPC header length, which includes 1 byte compression flag and 4 bytes message length.
147   private static final int HEADER_LENGTH = 5;
148   private static final Status SHUTDOWN_REASON = Status.UNAVAILABLE.withDescription("for test");
149   private static final HttpConnectProxiedSocketAddress NO_PROXY = null;
150   private static final int DEFAULT_START_STREAM_ID = 3;
151   private static final Attributes EAG_ATTRS = Attributes.EMPTY;
152   private static final Logger logger = Logger.getLogger(OkHttpClientTransport.class.getName());
153   private static final ClientStreamTracer[] tracers = new ClientStreamTracer[] {
154       new ClientStreamTracer() {}
155   };
156 
157   @Rule public final Timeout globalTimeout = Timeout.seconds(10);
158   @Rule public final MockitoRule mocks = MockitoJUnit.rule();
159 
160   private MethodDescriptor<Void, Void> method = TestMethodDescriptors.voidMethod();
161 
162   @Mock
163   private ManagedClientTransport.Listener transportListener;
164 
165   private final Queue<Buffer> capturedBuffer = new ArrayDeque<>();
166   private OkHttpClientTransport clientTransport;
167   private final MockFrameReader frameReader = new MockFrameReader();
168   private final Socket socket = new MockSocket(frameReader);
169   private final FrameWriter frameWriter = mock(FrameWriter.class, AdditionalAnswers.delegatesTo(
170       new MockFrameWriter(socket, capturedBuffer)));
171   private ExecutorService executor = Executors.newCachedThreadPool();
172   private long nanoTime; // backs a ticker, for testing ping round-trip time measurement
173   private SettableFuture<Void> connectedFuture;
174   private Runnable tooManyPingsRunnable = new Runnable() {
175     @Override public void run() {
176       throw new AssertionError();
177     }
178   };
179   private OkHttpChannelBuilder channelBuilder = OkHttpChannelBuilder.forAddress("127.0.0.1", 1234)
180       .usePlaintext()
181       .executor(new FakeClock().getScheduledExecutorService()) // Executor unused
182       .scheduledExecutorService(new FakeClock().getScheduledExecutorService()) // Executor unused
183       .transportExecutor(executor)
184       .flowControlWindow(INITIAL_WINDOW_SIZE);
185 
186   @After
tearDown()187   public void tearDown() {
188     executor.shutdownNow();
189   }
190 
initTransport()191   private void initTransport() throws Exception {
192     startTransport(
193         DEFAULT_START_STREAM_ID, null, true, null);
194   }
195 
initTransport(int startId)196   private void initTransport(int startId) throws Exception {
197     startTransport(startId, null, true, null);
198   }
199 
startTransport(int startId, @Nullable Runnable connectingCallback, boolean waitingForConnected, String userAgent)200   private void startTransport(int startId, @Nullable Runnable connectingCallback,
201       boolean waitingForConnected, String userAgent)
202       throws Exception {
203     connectedFuture = SettableFuture.create();
204     final Ticker ticker = new Ticker() {
205       @Override
206       public long read() {
207         return nanoTime;
208       }
209     };
210     Supplier<Stopwatch> stopwatchSupplier = new Supplier<Stopwatch>() {
211       @Override
212       public Stopwatch get() {
213         return Stopwatch.createUnstarted(ticker);
214       }
215     };
216     channelBuilder.socketFactory(new FakeSocketFactory(socket));
217     clientTransport = new OkHttpClientTransport(
218         channelBuilder.buildTransportFactory(),
219         userAgent,
220         stopwatchSupplier,
221         new FakeVariant(frameReader, frameWriter),
222         connectingCallback,
223         connectedFuture,
224         tooManyPingsRunnable);
225     clientTransport.start(transportListener);
226     if (waitingForConnected) {
227       connectedFuture.get(TIME_OUT_MS, TimeUnit.MILLISECONDS);
228     }
229     if (startId != DEFAULT_START_STREAM_ID) {
230       clientTransport.setNextStreamId(startId);
231     }
232   }
233 
234   @Test
testToString()235   public void testToString() throws Exception {
236     InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415);
237     clientTransport = new OkHttpClientTransport(
238         channelBuilder.buildTransportFactory(),
239         address,
240         "hostname",
241         /*userAgent=*/ null,
242         EAG_ATTRS,
243         NO_PROXY,
244         tooManyPingsRunnable);
245     String s = clientTransport.toString();
246     assertTrue("Unexpected: " + s, s.contains("OkHttpClientTransport"));
247     assertTrue("Unexpected: " + s, s.contains(address.toString()));
248   }
249 
250   /**
251    * Test logging is functioning correctly for client received Http/2 frames. Not intended to test
252    * actual frame content being logged.
253    */
254   @Test
testClientHandlerFrameLogger()255   public void testClientHandlerFrameLogger() throws Exception {
256     final List<LogRecord> logs = new ArrayList<>();
257     Handler handler = new Handler() {
258       @Override
259       public void publish(LogRecord record) {
260         logs.add(record);
261       }
262 
263       @Override
264       public void flush() {
265       }
266 
267       @Override
268       public void close() throws SecurityException {
269       }
270     };
271     logger.addHandler(handler);
272     logger.setLevel(Level.ALL);
273 
274     initTransport();
275     assertThat(logs).hasSize(1);
276     LogRecord log = logs.remove(0);
277     assertThat(log.getMessage()).startsWith(Direction.OUTBOUND + " SETTINGS: ack=false");
278     assertThat(log.getLevel()).isEqualTo(Level.FINE);
279 
280     MockStreamListener listener = new MockStreamListener();
281     OkHttpClientStream stream =
282         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
283     stream.start(listener);
284     stream.request(1);
285 
286     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
287     assertThat(logs).hasSize(1);
288     log = logs.remove(0);
289     assertThat(log.getMessage()).startsWith(Direction.INBOUND + " HEADERS: streamId=" + 3);
290     assertThat(log.getLevel()).isEqualTo(Level.FINE);
291 
292     final String message = "Hello Client";
293     Buffer buffer = createMessageFrame(message);
294     frameHandler().data(false, 3, buffer, (int) buffer.size());
295     assertThat(logs).hasSize(1);
296     log = logs.remove(0);
297     assertThat(log.getMessage()).startsWith(Direction.INBOUND + " DATA: streamId=" + 3);
298     assertThat(log.getLevel()).isEqualTo(Level.FINE);
299 
300     // At most 64 bytes of data frame will be logged.
301     frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000);
302     assertThat(logs).hasSize(1);
303     log = logs.remove(0);
304     String data = log.getMessage();
305     assertThat(data).endsWith("...");
306     assertThat(data.substring(data.indexOf("bytes="), data.indexOf("..."))).hasLength(64 * 2 + 6);
307 
308     // A SETTINGS ACK frame is sent out after receiving SETTINGS frame.
309     frameHandler().settings(false, new Settings());
310     assertThat(logs).hasSize(2);
311     log = logs.remove(0);
312     assertThat(log.getMessage()).startsWith(Direction.INBOUND + " SETTINGS: ack=false");
313     assertThat(log.getLevel()).isEqualTo(Level.FINE);
314     log = logs.remove(0);
315     assertThat(log.getMessage()).startsWith(Direction.OUTBOUND + " SETTINGS: ack=true");
316     assertThat(log.getLevel()).isEqualTo(Level.FINE);
317 
318     // A PING ACK frame is sent out after receiving PING frame.
319     frameHandler().ping(false, 0, 0);
320     assertThat(logs).hasSize(2);
321     log = logs.remove(0);
322     assertThat(log.getMessage()).startsWith(Direction.INBOUND + " PING: ack=false");
323     assertThat(log.getLevel()).isEqualTo(Level.FINE);
324     log = logs.remove(0);
325     assertThat(log.getMessage()).startsWith(Direction.OUTBOUND + " PING: ack=true");
326     assertThat(log.getLevel()).isEqualTo(Level.FINE);
327 
328     // As server push is not supported, a RST_STREAM is sent out after receiving PUSH_PROMISE frame.
329     frameHandler().pushPromise(3, 3, grpcResponseHeaders());
330     assertThat(logs).hasSize(2);
331     log = logs.remove(0);
332     assertThat(log.getMessage()).startsWith(Direction.INBOUND + " PUSH_PROMISE");
333     assertThat(log.getLevel()).isEqualTo(Level.FINE);
334     log = logs.remove(0);
335     assertThat(log.getMessage()).startsWith(Direction.OUTBOUND + " RST_STREAM");
336     assertThat(log.getLevel()).isEqualTo(Level.FINE);
337 
338     frameHandler().rstStream(3, ErrorCode.CANCEL);
339     assertThat(logs).hasSize(1);
340     log = logs.remove(0);
341     assertThat(log.getMessage()).startsWith(Direction.INBOUND + " RST_STREAM");
342     assertThat(log.getLevel()).isEqualTo(Level.FINE);
343 
344     // Outbound GO_AWAY is responded after receiving inbound GO_AWAY frame.
345     frameHandler().goAway(3, ErrorCode.CANCEL, ByteString.EMPTY);
346     assertThat(logs).hasSize(2);
347     log = logs.remove(0);
348     assertThat(log.getMessage()).startsWith(Direction.INBOUND + " GO_AWAY");
349     assertThat(log.getLevel()).isEqualTo(Level.FINE);
350     log = logs.remove(0);
351     assertThat(log.getMessage()).startsWith(Direction.OUTBOUND + " GO_AWAY");
352     assertThat(log.getLevel()).isEqualTo(Level.FINE);
353 
354     frameHandler().windowUpdate(3, 32);
355     assertThat(logs).hasSize(1);
356     log = logs.remove(0);
357     assertThat(log.getMessage()).startsWith(Direction.INBOUND + " WINDOW_UPDATE");
358     assertThat(log.getLevel()).isEqualTo(Level.FINE);
359 
360     logger.removeHandler(handler);
361   }
362 
363   @Test
maxMessageSizeShouldBeEnforced()364   public void maxMessageSizeShouldBeEnforced() throws Exception {
365     channelBuilder.maxInboundMessageSize(1);
366     initTransport();
367 
368     MockStreamListener listener = new MockStreamListener();
369     OkHttpClientStream stream =
370         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
371     stream.start(listener);
372     stream.request(1);
373     assertContainStream(3);
374     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
375     assertNotNull(listener.headers);
376 
377     // Receive the message.
378     final String message = "Hello Client";
379     Buffer buffer = createMessageFrame(message);
380     frameHandler().data(false, 3, buffer, (int) buffer.size());
381 
382     listener.waitUntilStreamClosed();
383     assertEquals(Code.RESOURCE_EXHAUSTED, listener.status.getCode());
384     shutdownAndVerify();
385   }
386 
387   @Test
includeInitialWindowSizeInFirstSettings()388   public void includeInitialWindowSizeInFirstSettings() throws Exception {
389     channelBuilder.flowControlWindow(65535);
390     initTransport();
391 
392     ArgumentCaptor<Settings> settings = ArgumentCaptor.forClass(Settings.class);
393     verify(frameWriter, timeout(TIME_OUT_MS)).settings(settings.capture());
394     assertEquals(65535, settings.getValue().get(7));
395   }
396 
397   /**
398    * A "large" window size is anything over 65535 (the starting size for any connection-level
399    * flow control value).
400    */
401   @Test
includeInitialWindowSizeInFirstSettings_largeWindowSize()402   public void includeInitialWindowSizeInFirstSettings_largeWindowSize() throws Exception {
403     channelBuilder.flowControlWindow(75535); // 65535 + 10000
404     initTransport();
405 
406     ArgumentCaptor<Settings> settings = ArgumentCaptor.forClass(Settings.class);
407     verify(frameWriter, timeout(TIME_OUT_MS)).settings(settings.capture());
408     assertEquals(75535, settings.getValue().get(7));
409 
410     verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0, 10000);
411   }
412 
413   /**
414    * When nextFrame throws IOException, the transport should be aborted.
415    */
416   @Test
nextFrameThrowIoException()417   public void nextFrameThrowIoException() throws Exception {
418     initTransport();
419     MockStreamListener listener1 = new MockStreamListener();
420     MockStreamListener listener2 = new MockStreamListener();
421     OkHttpClientStream stream1 =
422         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
423     stream1.start(listener1);
424     stream1.request(1);
425     OkHttpClientStream stream2 =
426         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
427     stream2.start(listener2);
428     stream2.request(1);
429     assertEquals(2, activeStreamCount());
430     assertContainStream(3);
431     assertContainStream(5);
432     frameReader.throwIoExceptionForNextFrame();
433     listener1.waitUntilStreamClosed();
434     listener2.waitUntilStreamClosed();
435 
436     assertEquals(0, activeStreamCount());
437     assertEquals(Status.INTERNAL.getCode(), listener1.status.getCode());
438     assertEquals(NETWORK_ISSUE_MESSAGE, listener1.status.getCause().getMessage());
439     assertEquals(Status.INTERNAL.getCode(), listener2.status.getCode());
440     assertEquals(NETWORK_ISSUE_MESSAGE, listener2.status.getCause().getMessage());
441     verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class));
442     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
443     shutdownAndVerify();
444   }
445 
446   /**
447    * Test that even if an Error is thrown from the reading loop of the transport,
448    * it can still clean up and call transportShutdown() and transportTerminated() as expected
449    * by the channel.
450    */
451   @Test
nextFrameThrowsError()452   public void nextFrameThrowsError() throws Exception {
453     initTransport();
454     MockStreamListener listener = new MockStreamListener();
455     OkHttpClientStream stream =
456         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
457     stream.start(listener);
458     stream.request(1);
459     assertEquals(1, activeStreamCount());
460     assertContainStream(3);
461     frameReader.throwErrorForNextFrame();
462     listener.waitUntilStreamClosed();
463 
464     assertEquals(0, activeStreamCount());
465     assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
466     assertEquals(ERROR_MESSAGE, listener.status.getCause().getMessage());
467     verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class));
468     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
469     shutdownAndVerify();
470   }
471 
472   @Test
nextFrameReturnFalse()473   public void nextFrameReturnFalse() throws Exception {
474     initTransport();
475     MockStreamListener listener = new MockStreamListener();
476     OkHttpClientStream stream =
477         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
478     stream.start(listener);
479     stream.request(1);
480     frameReader.nextFrameAtEndOfStream();
481     listener.waitUntilStreamClosed();
482     assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode());
483     verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class));
484     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
485     shutdownAndVerify();
486   }
487 
488   @Test
readMessages()489   public void readMessages() throws Exception {
490     initTransport();
491     final int numMessages = 10;
492     final String message = "Hello Client";
493     MockStreamListener listener = new MockStreamListener();
494     OkHttpClientStream stream =
495         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
496     stream.start(listener);
497     stream.request(numMessages);
498     assertContainStream(3);
499     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
500     assertNotNull(listener.headers);
501     for (int i = 0; i < numMessages; i++) {
502       Buffer buffer = createMessageFrame(message + i);
503       frameHandler().data(false, 3, buffer, (int) buffer.size());
504     }
505     frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
506     listener.waitUntilStreamClosed();
507     assertEquals(Status.OK, listener.status);
508     assertNotNull(listener.trailers);
509     assertEquals(numMessages, listener.messages.size());
510     for (int i = 0; i < numMessages; i++) {
511       assertEquals(message + i, listener.messages.get(i));
512     }
513     shutdownAndVerify();
514   }
515 
516   @Test
receivedHeadersForInvalidStreamShouldKillConnection()517   public void receivedHeadersForInvalidStreamShouldKillConnection() throws Exception {
518     initTransport();
519     // Empty headers block without correct content type or status
520     frameHandler().headers(false, false, 3, 0, new ArrayList<Header>(),
521         HeadersMode.HTTP_20_HEADERS);
522     verify(frameWriter, timeout(TIME_OUT_MS))
523         .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class));
524     verify(transportListener).transportShutdown(isA(Status.class));
525     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
526     shutdownAndVerify();
527   }
528 
529   @Test
receivedDataForInvalidStreamShouldKillConnection()530   public void receivedDataForInvalidStreamShouldKillConnection() throws Exception {
531     initTransport();
532     frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000);
533     verify(frameWriter, timeout(TIME_OUT_MS))
534         .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class));
535     verify(transportListener).transportShutdown(isA(Status.class));
536     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
537     shutdownAndVerify();
538   }
539 
540   @Test
invalidInboundHeadersCancelStream()541   public void invalidInboundHeadersCancelStream() throws Exception {
542     initTransport();
543     MockStreamListener listener = new MockStreamListener();
544     OkHttpClientStream stream =
545         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
546     stream.start(listener);
547     stream.request(1);
548     assertContainStream(3);
549     // Headers block without correct content type or status
550     frameHandler().headers(false, false, 3, 0, Arrays.asList(new Header("random", "4")),
551         HeadersMode.HTTP_20_HEADERS);
552     // Now wait to receive 1000 bytes of data so we can have a better error message before
553     // cancelling the streaam.
554     frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000);
555     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
556     assertNull(listener.headers);
557     assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
558     assertNotNull(listener.trailers);
559     assertEquals("4", listener.trailers
560         .get(Metadata.Key.of("random", Metadata.ASCII_STRING_MARSHALLER)));
561     shutdownAndVerify();
562   }
563 
564   @Test
invalidInboundTrailersPropagateToMetadata()565   public void invalidInboundTrailersPropagateToMetadata() throws Exception {
566     initTransport();
567     MockStreamListener listener = new MockStreamListener();
568     OkHttpClientStream stream =
569         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
570     stream.start(listener);
571     stream.request(1);
572     assertContainStream(3);
573     // Headers block with EOS without correct content type or status
574     frameHandler().headers(true, true, 3, 0, Arrays.asList(new Header("random", "4")),
575         HeadersMode.HTTP_20_HEADERS);
576     assertNull(listener.headers);
577     assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
578     assertNotNull(listener.trailers);
579     assertEquals("4", listener.trailers
580         .get(Metadata.Key.of("random", Metadata.ASCII_STRING_MARSHALLER)));
581     shutdownAndVerify();
582   }
583 
584   @Test
readStatus()585   public void readStatus() throws Exception {
586     initTransport();
587     MockStreamListener listener = new MockStreamListener();
588     OkHttpClientStream stream =
589         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
590     stream.start(listener);
591     assertContainStream(3);
592     frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
593     listener.waitUntilStreamClosed();
594     assertEquals(Status.Code.OK, listener.status.getCode());
595     shutdownAndVerify();
596   }
597 
598   @Test
receiveReset()599   public void receiveReset() throws Exception {
600     initTransport();
601     MockStreamListener listener = new MockStreamListener();
602     OkHttpClientStream stream =
603         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
604     stream.start(listener);
605     assertContainStream(3);
606     frameHandler().rstStream(3, ErrorCode.PROTOCOL_ERROR);
607     listener.waitUntilStreamClosed();
608 
609     assertThat(listener.status.getDescription()).contains("Rst Stream");
610     assertThat(listener.status.getCode()).isEqualTo(Code.INTERNAL);
611     shutdownAndVerify();
612   }
613 
614 
615   @Test
receiveResetNoError()616   public void receiveResetNoError() throws Exception {
617     initTransport();
618     MockStreamListener listener = new MockStreamListener();
619     OkHttpClientStream stream =
620         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
621     stream.start(listener);
622     assertContainStream(3);
623     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
624     Buffer buffer = createMessageFrame("a message");
625     frameHandler().data(false, 3, buffer, (int) buffer.size());
626     frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
627     frameHandler().rstStream(3, ErrorCode.NO_ERROR);
628     stream.request(1);
629     listener.waitUntilStreamClosed();
630 
631     assertTrue(listener.status.isOk());
632     shutdownAndVerify();
633   }
634 
635   @Test
cancelStream()636   public void cancelStream() throws Exception {
637     initTransport();
638     MockStreamListener listener = new MockStreamListener();
639     OkHttpClientStream stream =
640         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
641     stream.start(listener);
642     getStream(3).cancel(Status.CANCELLED);
643     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
644     listener.waitUntilStreamClosed();
645     assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(),
646         listener.status.getCode());
647     shutdownAndVerify();
648   }
649 
650   @Test
addDefaultUserAgent()651   public void addDefaultUserAgent() throws Exception {
652     initTransport();
653     MockStreamListener listener = new MockStreamListener();
654     OkHttpClientStream stream =
655         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
656     stream.start(listener);
657     Header userAgentHeader = new Header(GrpcUtil.USER_AGENT_KEY.name(),
658             GrpcUtil.getGrpcUserAgent("okhttp", null));
659     List<Header> expectedHeaders = Arrays.asList(HTTP_SCHEME_HEADER, METHOD_HEADER,
660             new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"),
661             new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()),
662             userAgentHeader, CONTENT_TYPE_HEADER, TE_HEADER);
663     verify(frameWriter, timeout(TIME_OUT_MS))
664         .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders));
665     getStream(3).cancel(Status.CANCELLED);
666     shutdownAndVerify();
667   }
668 
669   @Test
overrideDefaultUserAgent()670   public void overrideDefaultUserAgent() throws Exception {
671     startTransport(3, null, true, "fakeUserAgent");
672     MockStreamListener listener = new MockStreamListener();
673     OkHttpClientStream stream =
674         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
675     stream.start(listener);
676     List<Header> expectedHeaders = Arrays.asList(HTTP_SCHEME_HEADER, METHOD_HEADER,
677         new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"),
678         new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()),
679         new Header(GrpcUtil.USER_AGENT_KEY.name(),
680             GrpcUtil.getGrpcUserAgent("okhttp", "fakeUserAgent")),
681         CONTENT_TYPE_HEADER, TE_HEADER);
682     verify(frameWriter, timeout(TIME_OUT_MS))
683         .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders));
684     getStream(3).cancel(Status.CANCELLED);
685     shutdownAndVerify();
686   }
687 
688   @Test
cancelStreamForDeadlineExceeded()689   public void cancelStreamForDeadlineExceeded() throws Exception {
690     initTransport();
691     MockStreamListener listener = new MockStreamListener();
692     OkHttpClientStream stream =
693         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
694     stream.start(listener);
695     getStream(3).cancel(Status.DEADLINE_EXCEEDED);
696     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
697     listener.waitUntilStreamClosed();
698     shutdownAndVerify();
699   }
700 
701   @Test
writeMessage()702   public void writeMessage() throws Exception {
703     initTransport();
704     final String message = "Hello Server";
705     MockStreamListener listener = new MockStreamListener();
706     OkHttpClientStream stream =
707         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
708     stream.start(listener);
709     InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
710     assertEquals(12, input.available());
711     stream.writeMessage(input);
712     stream.flush();
713     verify(frameWriter, timeout(TIME_OUT_MS))
714         .data(eq(false), eq(3), any(Buffer.class), eq(12 + HEADER_LENGTH));
715     Buffer sentFrame = capturedBuffer.poll();
716     assertEquals(createMessageFrame(message), sentFrame);
717     stream.cancel(Status.CANCELLED);
718     shutdownAndVerify();
719   }
720 
721   @Test
transportTracer_windowSizeDefault()722   public void transportTracer_windowSizeDefault() throws Exception {
723     initTransport();
724     TransportStats stats = getTransportStats(clientTransport);
725     assertEquals(INITIAL_WINDOW_SIZE / 2, stats.remoteFlowControlWindow); // Lower bound
726     assertEquals(INITIAL_WINDOW_SIZE, stats.localFlowControlWindow);
727   }
728 
729   @Test
transportTracer_windowSize_remote()730   public void transportTracer_windowSize_remote() throws Exception {
731     initTransport();
732     TransportStats before = getTransportStats(clientTransport);
733     assertEquals(INITIAL_WINDOW_SIZE / 2, before.remoteFlowControlWindow); // Lower bound
734     assertEquals(INITIAL_WINDOW_SIZE, before.localFlowControlWindow);
735 
736     frameHandler().windowUpdate(0, 1000);
737     TransportStats after = getTransportStats(clientTransport);
738     assertEquals(INITIAL_WINDOW_SIZE / 2, after.remoteFlowControlWindow);
739     assertEquals(INITIAL_WINDOW_SIZE + 1000, after.localFlowControlWindow);
740   }
741 
742   @Test
windowUpdate()743   public void windowUpdate() throws Exception {
744     initTransport();
745     MockStreamListener listener1 = new MockStreamListener();
746     MockStreamListener listener2 = new MockStreamListener();
747     OkHttpClientStream stream1 =
748         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
749     stream1.start(listener1);
750     stream1.request(2);
751 
752     OkHttpClientStream stream2 =
753         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
754     stream2.start(listener2);
755     stream2.request(2);
756     assertEquals(2, activeStreamCount());
757     stream1 = getStream(3);
758     stream2 = getStream(5);
759 
760     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
761     frameHandler().headers(false, false, 5, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
762 
763     int messageLength = INITIAL_WINDOW_SIZE / 4;
764     byte[] fakeMessage = new byte[messageLength];
765 
766     // Stream 1 receives a message
767     Buffer buffer = createMessageFrame(fakeMessage);
768     int messageFrameLength = (int) buffer.size();
769     frameHandler().data(false, 3, buffer, messageFrameLength);
770 
771     // Stream 2 receives a message
772     buffer = createMessageFrame(fakeMessage);
773     frameHandler().data(false, 5, buffer, messageFrameLength);
774 
775     verify(frameWriter, timeout(TIME_OUT_MS))
776         .windowUpdate(eq(0), eq((long) 2 * messageFrameLength));
777     reset(frameWriter);
778 
779     // Stream 1 receives another message
780     buffer = createMessageFrame(fakeMessage);
781     frameHandler().data(false, 3, buffer, messageFrameLength);
782 
783     verify(frameWriter, timeout(TIME_OUT_MS))
784         .windowUpdate(eq(3), eq((long) 2 * messageFrameLength));
785 
786     // Stream 2 receives another message
787     buffer = createMessageFrame(fakeMessage);
788     frameHandler().data(false, 5, buffer, messageFrameLength);
789 
790     verify(frameWriter, timeout(TIME_OUT_MS))
791         .windowUpdate(eq(5), eq((long) 2 * messageFrameLength));
792     verify(frameWriter, timeout(TIME_OUT_MS))
793         .windowUpdate(eq(0), eq((long) 2 * messageFrameLength));
794 
795     stream1.cancel(Status.CANCELLED);
796     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
797     listener1.waitUntilStreamClosed();
798     assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(),
799         listener1.status.getCode());
800 
801     stream2.cancel(Status.CANCELLED);
802     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(5), eq(ErrorCode.CANCEL));
803     listener2.waitUntilStreamClosed();
804     assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(),
805         listener2.status.getCode());
806     shutdownAndVerify();
807   }
808 
809   @Test
windowUpdateWithInboundFlowControl()810   public void windowUpdateWithInboundFlowControl() throws Exception {
811     initTransport();
812     MockStreamListener listener = new MockStreamListener();
813     OkHttpClientStream stream =
814         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
815     stream.start(listener);
816     int messageLength = INITIAL_WINDOW_SIZE / 2 + 1;
817     byte[] fakeMessage = new byte[messageLength];
818 
819     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
820     Buffer buffer = createMessageFrame(fakeMessage);
821     long messageFrameLength = buffer.size();
822     frameHandler().data(false, 3, buffer, (int) messageFrameLength);
823     ArgumentCaptor<Integer> idCaptor = ArgumentCaptor.forClass(Integer.class);
824     verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(
825         idCaptor.capture(), eq(messageFrameLength));
826     // Should only send window update for the connection.
827     assertEquals(1, idCaptor.getAllValues().size());
828     assertEquals(0, (int)idCaptor.getValue());
829 
830     stream.request(1);
831     // We return the bytes for the stream window as we read the message.
832     verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(eq(3), eq(messageFrameLength));
833 
834     getStream(3).cancel(Status.CANCELLED);
835     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
836     listener.waitUntilStreamClosed();
837     assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(),
838         listener.status.getCode());
839     shutdownAndVerify();
840   }
841 
842   /**
843    * Outbound flow control where the initial flow control window stays at the default size of 65535.
844    */
845   @Test
outboundFlowControl()846   public void outboundFlowControl() throws Exception {
847     initTransport();
848     MockStreamListener listener = new MockStreamListener();
849     OkHttpClientStream stream =
850         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
851     stream.start(listener);
852 
853     // Outbound window always starts at 65535 until changed by Settings.INITIAL_WINDOW_SIZE
854     int initialOutboundWindowSize = 65535;
855     int messageLength = initialOutboundWindowSize / 2 + 1;
856 
857     // The first message should be sent out.
858     InputStream input = new ByteArrayInputStream(new byte[messageLength]);
859     stream.writeMessage(input);
860     stream.flush();
861     verify(frameWriter, timeout(TIME_OUT_MS)).data(
862         eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH));
863 
864     // The second message should be partially sent out.
865     input = new ByteArrayInputStream(new byte[messageLength]);
866     stream.writeMessage(input);
867     stream.flush();
868     int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH;
869     verify(frameWriter, timeout(TIME_OUT_MS))
870         .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize));
871 
872     // Get more credit so the rest of the data should be sent out.
873     frameHandler().windowUpdate(3, initialOutboundWindowSize);
874     frameHandler().windowUpdate(0, initialOutboundWindowSize);
875     verify(frameWriter, timeout(TIME_OUT_MS)).data(
876         eq(false), eq(3), any(Buffer.class),
877         eq(messageLength + HEADER_LENGTH - partiallySentSize));
878 
879     stream.cancel(Status.CANCELLED);
880     listener.waitUntilStreamClosed();
881     shutdownAndVerify();
882   }
883 
884   /**
885    * Outbound flow control where the initial window size is reduced before a stream is started.
886    */
887   @Test
outboundFlowControl_smallWindowSize()888   public void outboundFlowControl_smallWindowSize() throws Exception {
889     initTransport();
890 
891     int initialOutboundWindowSize = 100;
892     setInitialWindowSize(initialOutboundWindowSize);
893 
894     MockStreamListener listener = new MockStreamListener();
895     OkHttpClientStream stream =
896             clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
897     stream.start(listener);
898 
899     int messageLength = 75;
900     // The first message should be sent out.
901     InputStream input = new ByteArrayInputStream(new byte[messageLength]);
902     stream.writeMessage(input);
903     stream.flush();
904     verify(frameWriter, timeout(TIME_OUT_MS)).data(
905             eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH));
906 
907     // The second message should be partially sent out.
908     input = new ByteArrayInputStream(new byte[messageLength]);
909     stream.writeMessage(input);
910     stream.flush();
911     int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH;
912     verify(frameWriter, timeout(TIME_OUT_MS))
913             .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize));
914 
915     // Get more credit so the rest of the data should be sent out.
916     frameHandler().windowUpdate(3, initialOutboundWindowSize);
917     verify(frameWriter, timeout(TIME_OUT_MS)).data(
918             eq(false), eq(3), any(Buffer.class),
919             eq(messageLength + HEADER_LENGTH - partiallySentSize));
920 
921     stream.cancel(Status.CANCELLED);
922     listener.waitUntilStreamClosed();
923     shutdownAndVerify();
924   }
925 
926   /**
927    * Outbound flow control where the initial window size is increased before a stream is started.
928    */
929   @Test
outboundFlowControl_bigWindowSize()930   public void outboundFlowControl_bigWindowSize() throws Exception {
931     initTransport();
932 
933     int initialOutboundWindowSize = 131070; // 65535 * 2
934     setInitialWindowSize(initialOutboundWindowSize);
935     frameHandler().windowUpdate(0, 65535);
936 
937     MockStreamListener listener = new MockStreamListener();
938     OkHttpClientStream stream =
939             clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
940     stream.start(listener);
941 
942     int messageLength = 100000;
943     // The first message should be sent out.
944     InputStream input = new ByteArrayInputStream(new byte[messageLength]);
945     stream.writeMessage(input);
946     stream.flush();
947     verify(frameWriter, timeout(TIME_OUT_MS)).data(
948             eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH));
949 
950     // The second message should be partially sent out.
951     input = new ByteArrayInputStream(new byte[messageLength]);
952     stream.writeMessage(input);
953     stream.flush();
954     int partiallySentSize = initialOutboundWindowSize - messageLength - HEADER_LENGTH;
955     verify(frameWriter, timeout(TIME_OUT_MS))
956             .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize));
957 
958     // Get more credit so the rest of the data should be sent out.
959     frameHandler().windowUpdate(0, initialOutboundWindowSize);
960     frameHandler().windowUpdate(3, initialOutboundWindowSize);
961     verify(frameWriter, timeout(TIME_OUT_MS)).data(
962             eq(false), eq(3), any(Buffer.class),
963             eq(messageLength + HEADER_LENGTH - partiallySentSize));
964 
965     stream.cancel(Status.CANCELLED);
966     listener.waitUntilStreamClosed();
967     shutdownAndVerify();
968   }
969 
970   @Test
outboundFlowControlWithInitialWindowSizeChange()971   public void outboundFlowControlWithInitialWindowSizeChange() throws Exception {
972     initTransport();
973     MockStreamListener listener = new MockStreamListener();
974     OkHttpClientStream stream =
975         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
976     stream.start(listener);
977     int messageLength = 20;
978     setInitialWindowSize(HEADER_LENGTH + 10);
979     InputStream input = new ByteArrayInputStream(new byte[messageLength]);
980     stream.writeMessage(input);
981     stream.flush();
982     // part of the message can be sent.
983     verify(frameWriter, timeout(TIME_OUT_MS))
984         .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 10));
985     // Avoid connection flow control.
986     frameHandler().windowUpdate(0, HEADER_LENGTH + 10);
987 
988     // Increase initial window size
989     setInitialWindowSize(HEADER_LENGTH + 20);
990     // The rest data should be sent.
991     verify(frameWriter, timeout(TIME_OUT_MS)).data(eq(false), eq(3), any(Buffer.class), eq(10));
992     frameHandler().windowUpdate(0, 10);
993 
994     // Decrease initial window size to HEADER_LENGTH, since we've already sent
995     // out HEADER_LENGTH + 20 bytes data, the window size should be -20 now.
996     setInitialWindowSize(HEADER_LENGTH);
997     // Get 20 tokens back, still can't send any data.
998     frameHandler().windowUpdate(3, 20);
999     input = new ByteArrayInputStream(new byte[messageLength]);
1000     stream.writeMessage(input);
1001     stream.flush();
1002     // Only the previous two write operations happened.
1003     verify(frameWriter, timeout(TIME_OUT_MS).times(2))
1004         .data(anyBoolean(), anyInt(), any(Buffer.class), anyInt());
1005 
1006     // Get enough tokens to send the pending message.
1007     frameHandler().windowUpdate(3, HEADER_LENGTH + 20);
1008     verify(frameWriter, timeout(TIME_OUT_MS))
1009         .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 20));
1010 
1011     stream.cancel(Status.CANCELLED);
1012     listener.waitUntilStreamClosed();
1013     shutdownAndVerify();
1014   }
1015 
1016   @Test
outboundFlowControlWithInitialWindowSizeChangeInMiddleOfStream()1017   public void outboundFlowControlWithInitialWindowSizeChangeInMiddleOfStream() throws Exception {
1018     initTransport();
1019     MockStreamListener listener = new MockStreamListener();
1020     OkHttpClientStream stream =
1021         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1022     stream.start(listener);
1023     int messageLength = 20;
1024     setInitialWindowSize(HEADER_LENGTH + 10);
1025     InputStream input = new ByteArrayInputStream(new byte[messageLength]);
1026     stream.writeMessage(input);
1027     stream.flush();
1028     // part of the message can be sent.
1029     verify(frameWriter, timeout(TIME_OUT_MS))
1030         .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 10));
1031     // Avoid connection flow control.
1032     frameHandler().windowUpdate(0, HEADER_LENGTH + 20);
1033 
1034     // Increase initial window size
1035     setInitialWindowSize(HEADER_LENGTH + 20);
1036 
1037     // wait until pending frames sent (inOrder doesn't support timeout)
1038     verify(frameWriter, timeout(TIME_OUT_MS).atLeastOnce())
1039         .data(eq(false), eq(3), any(Buffer.class), eq(10));
1040     // It should ack the settings, then send remaining message.
1041     InOrder inOrder = inOrder(frameWriter);
1042     inOrder.verify(frameWriter).ackSettings(any(Settings.class));
1043     inOrder.verify(frameWriter).data(eq(false), eq(3), any(Buffer.class), eq(10));
1044 
1045     stream.cancel(Status.CANCELLED);
1046     listener.waitUntilStreamClosed();
1047     shutdownAndVerify();
1048   }
1049 
1050   @Test
stopNormally()1051   public void stopNormally() throws Exception {
1052     initTransport();
1053     MockStreamListener listener1 = new MockStreamListener();
1054     MockStreamListener listener2 = new MockStreamListener();
1055     OkHttpClientStream stream1 =
1056         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1057     stream1.start(listener1);
1058     OkHttpClientStream stream2 =
1059         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1060     stream2.start(listener2);
1061     assertEquals(2, activeStreamCount());
1062     clientTransport.shutdown(SHUTDOWN_REASON);
1063 
1064     assertEquals(2, activeStreamCount());
1065     verify(transportListener).transportShutdown(same(SHUTDOWN_REASON));
1066 
1067     stream1.cancel(Status.CANCELLED);
1068     stream2.cancel(Status.CANCELLED);
1069     listener1.waitUntilStreamClosed();
1070     listener2.waitUntilStreamClosed();
1071     assertEquals(0, activeStreamCount());
1072     assertEquals(Status.CANCELLED.getCode(), listener1.status.getCode());
1073     assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode());
1074     verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any());
1075     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1076     shutdownAndVerify();
1077   }
1078 
1079   @Test
receiveGoAway()1080   public void receiveGoAway() throws Exception {
1081     initTransport();
1082     // start 2 streams.
1083     MockStreamListener listener1 = new MockStreamListener();
1084     MockStreamListener listener2 = new MockStreamListener();
1085     OkHttpClientStream stream1 =
1086         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1087     stream1.start(listener1);
1088     stream1.request(1);
1089     OkHttpClientStream stream2 =
1090         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1091     stream2.start(listener2);
1092     stream2.request(1);
1093     assertEquals(2, activeStreamCount());
1094 
1095     // Receive goAway, max good id is 3.
1096     frameHandler().goAway(3, ErrorCode.CANCEL, ByteString.EMPTY);
1097 
1098     // Transport should be in STOPPING state.
1099     verify(transportListener).transportShutdown(isA(Status.class));
1100     verify(transportListener, never()).transportTerminated();
1101 
1102     // Stream 2 should be closed.
1103     listener2.waitUntilStreamClosed();
1104     assertEquals(1, activeStreamCount());
1105     assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode());
1106 
1107     // New stream should be failed.
1108     assertNewStreamFail();
1109 
1110     // But stream 1 should be able to send.
1111     final String sentMessage = "Should I also go away?";
1112     OkHttpClientStream stream = getStream(3);
1113     InputStream input = new ByteArrayInputStream(sentMessage.getBytes(UTF_8));
1114     assertEquals(22, input.available());
1115     stream.writeMessage(input);
1116     stream.flush();
1117     verify(frameWriter, timeout(TIME_OUT_MS))
1118         .data(eq(false), eq(3), any(Buffer.class), eq(22 + HEADER_LENGTH));
1119     Buffer sentFrame = capturedBuffer.poll();
1120     assertEquals(createMessageFrame(sentMessage), sentFrame);
1121 
1122     // And read.
1123     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
1124     final String receivedMessage = "No, you are fine.";
1125     Buffer buffer = createMessageFrame(receivedMessage);
1126     frameHandler().data(false, 3, buffer, (int) buffer.size());
1127     frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
1128     listener1.waitUntilStreamClosed();
1129     assertEquals(1, listener1.messages.size());
1130     assertEquals(receivedMessage, listener1.messages.get(0));
1131 
1132     // The transport should be stopped after all active streams finished.
1133     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1134     shutdownAndVerify();
1135   }
1136 
1137   @Test
streamIdExhausted()1138   public void streamIdExhausted() throws Exception {
1139     int startId = Integer.MAX_VALUE - 2;
1140     initTransport(startId);
1141 
1142     MockStreamListener listener = new MockStreamListener();
1143     OkHttpClientStream stream =
1144         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1145     stream.start(listener);
1146     stream.request(1);
1147 
1148     // New stream should be failed.
1149     assertNewStreamFail();
1150 
1151     // The alive stream should be functional, receives a message.
1152     frameHandler().headers(
1153         false, false, startId, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
1154     assertNotNull(listener.headers);
1155     String message = "hello";
1156     Buffer buffer = createMessageFrame(message);
1157     frameHandler().data(false, startId, buffer, (int) buffer.size());
1158 
1159     getStream(startId).cancel(Status.CANCELLED);
1160     // Receives the second message after be cancelled.
1161     buffer = createMessageFrame(message);
1162     frameHandler().data(false, startId, buffer, (int) buffer.size());
1163 
1164     listener.waitUntilStreamClosed();
1165     // Should only have the first message delivered.
1166     assertEquals(message, listener.messages.get(0));
1167     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(startId), eq(ErrorCode.CANCEL));
1168     verify(transportListener).transportShutdown(isA(Status.class));
1169     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1170     shutdownAndVerify();
1171   }
1172 
1173   @Test
pendingStreamSucceed()1174   public void pendingStreamSucceed() throws Exception {
1175     initTransport();
1176     setMaxConcurrentStreams(1);
1177     final MockStreamListener listener1 = new MockStreamListener();
1178     final MockStreamListener listener2 = new MockStreamListener();
1179     OkHttpClientStream stream1 =
1180         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1181     stream1.start(listener1);
1182     // The second stream should be pending.
1183     OkHttpClientStream stream2 =
1184         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1185     stream2.start(listener2);
1186     String sentMessage = "hello";
1187     InputStream input = new ByteArrayInputStream(sentMessage.getBytes(UTF_8));
1188     assertEquals(5, input.available());
1189     stream2.writeMessage(input);
1190     stream2.flush();
1191     stream2.halfClose();
1192 
1193     waitForStreamPending(1);
1194     assertEquals(1, activeStreamCount());
1195 
1196     // Finish the first stream
1197     stream1.cancel(Status.CANCELLED);
1198     listener1.waitUntilStreamClosed();
1199 
1200     // The second stream should be active now, and the pending data should be sent out.
1201     assertEquals(1, activeStreamCount());
1202     assertEquals(0, clientTransport.getPendingStreamSize());
1203     verify(frameWriter, timeout(TIME_OUT_MS))
1204         .data(eq(true), eq(5), any(Buffer.class), eq(5 + HEADER_LENGTH));
1205     Buffer sentFrame = capturedBuffer.poll();
1206     assertEquals(createMessageFrame(sentMessage), sentFrame);
1207     stream2.cancel(Status.CANCELLED);
1208     shutdownAndVerify();
1209   }
1210 
1211   @Test
pendingStreamCancelled()1212   public void pendingStreamCancelled() throws Exception {
1213     initTransport();
1214     setMaxConcurrentStreams(0);
1215     MockStreamListener listener = new MockStreamListener();
1216     OkHttpClientStream stream =
1217         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1218     stream.start(listener);
1219     waitForStreamPending(1);
1220     stream.cancel(Status.CANCELLED);
1221     // The second cancel should be an no-op.
1222     stream.cancel(Status.UNKNOWN);
1223     listener.waitUntilStreamClosed();
1224     assertEquals(0, clientTransport.getPendingStreamSize());
1225     assertEquals(Status.CANCELLED.getCode(), listener.status.getCode());
1226     shutdownAndVerify();
1227   }
1228 
1229   @Test
pendingStreamFailedByGoAway()1230   public void pendingStreamFailedByGoAway() throws Exception {
1231     initTransport();
1232     setMaxConcurrentStreams(1);
1233     final MockStreamListener listener1 = new MockStreamListener();
1234     final MockStreamListener listener2 = new MockStreamListener();
1235     OkHttpClientStream stream1 =
1236         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1237     stream1.start(listener1);
1238     // The second stream should be pending.
1239     OkHttpClientStream stream2 =
1240         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1241     stream2.start(listener2);
1242 
1243     waitForStreamPending(1);
1244     assertEquals(1, activeStreamCount());
1245 
1246     // Receives GO_AWAY.
1247     frameHandler().goAway(99, ErrorCode.CANCEL, ByteString.EMPTY);
1248 
1249     listener2.waitUntilStreamClosed();
1250     assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode());
1251     assertEquals(0, clientTransport.getPendingStreamSize());
1252 
1253     // active stream should not be affected.
1254     assertEquals(1, activeStreamCount());
1255     getStream(3).cancel(Status.CANCELLED);
1256     shutdownAndVerify();
1257   }
1258 
1259   @Test
pendingStreamSucceedAfterShutdown()1260   public void pendingStreamSucceedAfterShutdown() throws Exception {
1261     initTransport();
1262     setMaxConcurrentStreams(0);
1263     final MockStreamListener listener = new MockStreamListener();
1264     // The second stream should be pending.
1265     OkHttpClientStream stream =
1266         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1267     stream.start(listener);
1268     waitForStreamPending(1);
1269 
1270     clientTransport.shutdown(SHUTDOWN_REASON);
1271     setMaxConcurrentStreams(1);
1272     verify(frameWriter, timeout(TIME_OUT_MS))
1273         .synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader());
1274     assertEquals(1, activeStreamCount());
1275     stream.cancel(Status.CANCELLED);
1276     shutdownAndVerify();
1277   }
1278 
1279   @Test
pendingStreamFailedByIdExhausted()1280   public void pendingStreamFailedByIdExhausted() throws Exception {
1281     int startId = Integer.MAX_VALUE - 4;
1282     initTransport(startId);
1283     setMaxConcurrentStreams(1);
1284 
1285     final MockStreamListener listener1 = new MockStreamListener();
1286     final MockStreamListener listener2 = new MockStreamListener();
1287     final MockStreamListener listener3 = new MockStreamListener();
1288 
1289     OkHttpClientStream stream1 =
1290         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1291     stream1.start(listener1);
1292 
1293     // The second and third stream should be pending.
1294     OkHttpClientStream stream2 =
1295         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1296     stream2.start(listener2);
1297     OkHttpClientStream stream3 =
1298         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1299     stream3.start(listener3);
1300 
1301     waitForStreamPending(2);
1302     assertEquals(1, activeStreamCount());
1303 
1304     // Now finish stream1, stream2 should be started and exhaust the id,
1305     // so stream3 should be failed.
1306     stream1.cancel(Status.CANCELLED);
1307     listener1.waitUntilStreamClosed();
1308     listener3.waitUntilStreamClosed();
1309     assertEquals(Status.UNAVAILABLE.getCode(), listener3.status.getCode());
1310     assertEquals(0, clientTransport.getPendingStreamSize());
1311     assertEquals(1, activeStreamCount());
1312     stream2 = getStream(startId + 2);
1313     stream2.cancel(Status.CANCELLED);
1314     shutdownAndVerify();
1315   }
1316 
1317   @Test
receivingWindowExceeded()1318   public void receivingWindowExceeded() throws Exception {
1319     initTransport();
1320     MockStreamListener listener = new MockStreamListener();
1321     OkHttpClientStream stream =
1322         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1323     stream.start(listener);
1324     stream.request(1);
1325 
1326     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
1327 
1328     int messageLength = INITIAL_WINDOW_SIZE + 1;
1329     byte[] fakeMessage = new byte[messageLength];
1330     Buffer buffer = createMessageFrame(fakeMessage);
1331     int messageFrameLength = (int) buffer.size();
1332     frameHandler().data(false, 3, buffer, messageFrameLength);
1333 
1334     listener.waitUntilStreamClosed();
1335     assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
1336     assertEquals("Received data size exceeded our receiving window size",
1337         listener.status.getDescription());
1338     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.FLOW_CONTROL_ERROR));
1339     shutdownAndVerify();
1340   }
1341 
1342   @Test
unaryHeadersShouldNotBeFlushed()1343   public void unaryHeadersShouldNotBeFlushed() throws Exception {
1344     // By default the method is a Unary call
1345     shouldHeadersBeFlushed(false);
1346     shutdownAndVerify();
1347   }
1348 
1349   @Test
serverStreamingHeadersShouldNotBeFlushed()1350   public void serverStreamingHeadersShouldNotBeFlushed() throws Exception {
1351     method = method.toBuilder().setType(MethodType.SERVER_STREAMING).build();
1352     shouldHeadersBeFlushed(false);
1353     shutdownAndVerify();
1354   }
1355 
1356   @Test
clientStreamingHeadersShouldBeFlushed()1357   public void clientStreamingHeadersShouldBeFlushed() throws Exception {
1358     method = method.toBuilder().setType(MethodType.CLIENT_STREAMING).build();
1359     shouldHeadersBeFlushed(true);
1360     shutdownAndVerify();
1361   }
1362 
1363   @Test
duplexStreamingHeadersShouldNotBeFlushed()1364   public void duplexStreamingHeadersShouldNotBeFlushed() throws Exception {
1365     method = method.toBuilder().setType(MethodType.BIDI_STREAMING).build();
1366     shouldHeadersBeFlushed(true);
1367     shutdownAndVerify();
1368   }
1369 
shouldHeadersBeFlushed(boolean shouldBeFlushed)1370   private void shouldHeadersBeFlushed(boolean shouldBeFlushed) throws Exception {
1371     initTransport();
1372     MockStreamListener listener = new MockStreamListener();
1373     OkHttpClientStream stream =
1374         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1375     stream.start(listener);
1376     verify(frameWriter, timeout(TIME_OUT_MS)).synStream(
1377         eq(false), eq(false), eq(3), eq(0), ArgumentMatchers.<Header>anyList());
1378     if (shouldBeFlushed) {
1379       verify(frameWriter, timeout(TIME_OUT_MS)).flush();
1380     } else {
1381       verify(frameWriter, timeout(TIME_OUT_MS).times(0)).flush();
1382     }
1383     stream.cancel(Status.CANCELLED);
1384   }
1385 
1386   @Test
receiveDataWithoutHeader()1387   public void receiveDataWithoutHeader() throws Exception {
1388     initTransport();
1389     MockStreamListener listener = new MockStreamListener();
1390     OkHttpClientStream stream =
1391         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1392     stream.start(listener);
1393     stream.request(1);
1394     Buffer buffer = createMessageFrame(new byte[1]);
1395     frameHandler().data(false, 3, buffer, (int) buffer.size());
1396 
1397     // Trigger the failure by a trailer.
1398     frameHandler().headers(
1399         true, true, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
1400 
1401     listener.waitUntilStreamClosed();
1402     assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
1403     assertTrue(listener.status.getDescription().startsWith("headers not received before payload"));
1404     assertEquals(0, listener.messages.size());
1405     shutdownAndVerify();
1406   }
1407 
1408   @Test
receiveDataWithoutHeaderAndTrailer()1409   public void receiveDataWithoutHeaderAndTrailer() throws Exception {
1410     initTransport();
1411     MockStreamListener listener = new MockStreamListener();
1412     OkHttpClientStream stream =
1413         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1414     stream.start(listener);
1415     stream.request(1);
1416     Buffer buffer = createMessageFrame(new byte[1]);
1417     frameHandler().data(false, 3, buffer, (int) buffer.size());
1418 
1419     // Trigger the failure by a data frame.
1420     buffer = createMessageFrame(new byte[1]);
1421     frameHandler().data(true, 3, buffer, (int) buffer.size());
1422 
1423     listener.waitUntilStreamClosed();
1424     assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
1425     assertTrue(listener.status.getDescription().startsWith("headers not received before payload"));
1426     assertEquals(0, listener.messages.size());
1427     shutdownAndVerify();
1428   }
1429 
1430   @Test
receiveLongEnoughDataWithoutHeaderAndTrailer()1431   public void receiveLongEnoughDataWithoutHeaderAndTrailer() throws Exception {
1432     initTransport();
1433     MockStreamListener listener = new MockStreamListener();
1434     OkHttpClientStream stream =
1435         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1436     stream.start(listener);
1437     stream.request(1);
1438     Buffer buffer = createMessageFrame(new byte[1000]);
1439     frameHandler().data(false, 3, buffer, (int) buffer.size());
1440 
1441     // Once we receive enough detail, we cancel the stream. so we should have sent cancel.
1442     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
1443 
1444     listener.waitUntilStreamClosed();
1445     assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
1446     assertTrue(listener.status.getDescription().startsWith("headers not received before payload"));
1447     assertEquals(0, listener.messages.size());
1448     shutdownAndVerify();
1449   }
1450 
1451   @Test
receiveDataForUnknownStreamUpdateConnectionWindow()1452   public void receiveDataForUnknownStreamUpdateConnectionWindow() throws Exception {
1453     initTransport();
1454     MockStreamListener listener = new MockStreamListener();
1455     OkHttpClientStream stream =
1456         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1457     stream.start(listener);
1458     stream.cancel(Status.CANCELLED);
1459 
1460     Buffer buffer = createMessageFrame(
1461         new byte[INITIAL_WINDOW_SIZE / 2 + 1]);
1462     frameHandler().data(false, 3, buffer, (int) buffer.size());
1463     // Should still update the connection window even stream 3 is gone.
1464     verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0,
1465         HEADER_LENGTH + INITIAL_WINDOW_SIZE / 2 + 1);
1466     buffer = createMessageFrame(
1467         new byte[INITIAL_WINDOW_SIZE / 2 + 1]);
1468 
1469     // This should kill the connection, since we never created stream 5.
1470     frameHandler().data(false, 5, buffer, (int) buffer.size());
1471     verify(frameWriter, timeout(TIME_OUT_MS))
1472         .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class));
1473     verify(transportListener).transportShutdown(isA(Status.class));
1474     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1475     shutdownAndVerify();
1476   }
1477 
1478   @Test
receiveWindowUpdateForUnknownStream()1479   public void receiveWindowUpdateForUnknownStream() throws Exception {
1480     initTransport();
1481     MockStreamListener listener = new MockStreamListener();
1482     OkHttpClientStream stream =
1483         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1484     stream.start(listener);
1485     stream.cancel(Status.CANCELLED);
1486     // This should be ignored.
1487     frameHandler().windowUpdate(3, 73);
1488     listener.waitUntilStreamClosed();
1489     // This should kill the connection, since we never created stream 5.
1490     frameHandler().windowUpdate(5, 73);
1491     verify(frameWriter, timeout(TIME_OUT_MS))
1492         .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class));
1493     verify(transportListener).transportShutdown(isA(Status.class));
1494     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1495     shutdownAndVerify();
1496   }
1497 
1498   @Test
shouldBeInitiallyReady()1499   public void shouldBeInitiallyReady() throws Exception {
1500     initTransport();
1501     MockStreamListener listener = new MockStreamListener();
1502     OkHttpClientStream stream =
1503         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1504     stream.start(listener);
1505     assertTrue(stream.isReady());
1506     assertTrue(listener.isOnReadyCalled());
1507     stream.cancel(Status.CANCELLED);
1508     assertFalse(stream.isReady());
1509     shutdownAndVerify();
1510   }
1511 
1512   @Test
notifyOnReady()1513   public void notifyOnReady() throws Exception {
1514     initTransport();
1515     // exactly one byte below the threshold
1516     int messageLength =
1517         AbstractStream.TransportState.DEFAULT_ONREADY_THRESHOLD - HEADER_LENGTH - 1;
1518     setInitialWindowSize(0);
1519     MockStreamListener listener = new MockStreamListener();
1520     OkHttpClientStream stream =
1521         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1522     stream.start(listener);
1523     assertTrue(stream.isReady());
1524     // Be notified at the beginning.
1525     assertTrue(listener.isOnReadyCalled());
1526 
1527     // Write a message that will not exceed the notification threshold and queue it.
1528     InputStream input = new ByteArrayInputStream(new byte[messageLength]);
1529     stream.writeMessage(input);
1530     stream.flush();
1531     assertTrue(stream.isReady());
1532 
1533     // Write another two messages, still be queued.
1534     input = new ByteArrayInputStream(new byte[messageLength]);
1535     stream.writeMessage(input);
1536     stream.flush();
1537     assertFalse(stream.isReady());
1538     input = new ByteArrayInputStream(new byte[messageLength]);
1539     stream.writeMessage(input);
1540     stream.flush();
1541     assertFalse(stream.isReady());
1542 
1543     // Let the first message out.
1544     frameHandler().windowUpdate(0, HEADER_LENGTH + messageLength);
1545     frameHandler().windowUpdate(3, HEADER_LENGTH + messageLength);
1546     assertFalse(stream.isReady());
1547     assertFalse(listener.isOnReadyCalled());
1548 
1549     // Let the second message out.
1550     frameHandler().windowUpdate(0, HEADER_LENGTH + messageLength);
1551     frameHandler().windowUpdate(3, HEADER_LENGTH + messageLength);
1552     assertTrue(stream.isReady());
1553     assertTrue(listener.isOnReadyCalled());
1554 
1555     stream.cancel(Status.CANCELLED);
1556     shutdownAndVerify();
1557   }
1558 
1559   @Test
transportReady()1560   public void transportReady() throws Exception {
1561     initTransport();
1562     verifyNoInteractions(transportListener);
1563     frameHandler().settings(false, new Settings());
1564     verify(transportListener).transportReady();
1565     shutdownAndVerify();
1566   }
1567 
1568   @Test
ping()1569   public void ping() throws Exception {
1570     initTransport();
1571     PingCallbackImpl callback1 = new PingCallbackImpl();
1572     clientTransport.ping(callback1, MoreExecutors.directExecutor());
1573     assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
1574     // add'l ping will be added as listener to outstanding operation
1575     PingCallbackImpl callback2 = new PingCallbackImpl();
1576     clientTransport.ping(callback2, MoreExecutors.directExecutor());
1577     assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
1578 
1579     ArgumentCaptor<Integer> captor1 = ArgumentCaptor.forClass(int.class);
1580     ArgumentCaptor<Integer> captor2 = ArgumentCaptor.forClass(int.class);
1581     verify(frameWriter, timeout(TIME_OUT_MS)).ping(eq(false), captor1.capture(), captor2.capture());
1582     // callback not invoked until we see acknowledgement
1583     assertEquals(0, callback1.invocationCount);
1584     assertEquals(0, callback2.invocationCount);
1585 
1586     int payload1 = captor1.getValue();
1587     int payload2 = captor2.getValue();
1588     // getting a bad ack won't complete the future
1589     // to make the ack "bad", we modify the payload so it doesn't match
1590     frameHandler().ping(true, payload1, payload2 - 1);
1591     // operation not complete because ack was wrong
1592     assertEquals(0, callback1.invocationCount);
1593     assertEquals(0, callback2.invocationCount);
1594 
1595     nanoTime += 10101;
1596 
1597     // reading the proper response should complete the future
1598     frameHandler().ping(true, payload1, payload2);
1599     assertEquals(1, callback1.invocationCount);
1600     assertEquals(10101, callback1.roundTripTime);
1601     assertNull(callback1.failureCause);
1602     // callback2 piggy-backed on same operation
1603     assertEquals(1, callback2.invocationCount);
1604     assertEquals(10101, callback2.roundTripTime);
1605     assertNull(callback2.failureCause);
1606 
1607     // now that previous ping is done, next request returns a different future
1608     callback1 = new PingCallbackImpl();
1609     clientTransport.ping(callback1, MoreExecutors.directExecutor());
1610     assertEquals(2, getTransportStats(clientTransport).keepAlivesSent);
1611     assertEquals(0, callback1.invocationCount);
1612     shutdownAndVerify();
1613   }
1614 
1615   @Test
ping_failsWhenTransportShutdown()1616   public void ping_failsWhenTransportShutdown() throws Exception {
1617     initTransport();
1618     PingCallbackImpl callback = new PingCallbackImpl();
1619     clientTransport.ping(callback, MoreExecutors.directExecutor());
1620     assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
1621     assertEquals(0, callback.invocationCount);
1622 
1623     clientTransport.shutdown(SHUTDOWN_REASON);
1624     // ping failed on channel shutdown
1625     assertEquals(1, callback.invocationCount);
1626     assertTrue(callback.failureCause instanceof StatusException);
1627     assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
1628 
1629     // now that handler is in terminal state, all future pings fail immediately
1630     callback = new PingCallbackImpl();
1631     clientTransport.ping(callback, MoreExecutors.directExecutor());
1632     assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
1633     assertEquals(1, callback.invocationCount);
1634     assertTrue(callback.failureCause instanceof StatusException);
1635     assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
1636     shutdownAndVerify();
1637   }
1638 
1639   @Test
ping_failsIfTransportFails()1640   public void ping_failsIfTransportFails() throws Exception {
1641     initTransport();
1642     PingCallbackImpl callback = new PingCallbackImpl();
1643     clientTransport.ping(callback, MoreExecutors.directExecutor());
1644     assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
1645     assertEquals(0, callback.invocationCount);
1646 
1647     clientTransport.onException(new IOException());
1648     // ping failed on error
1649     assertEquals(1, callback.invocationCount);
1650     assertTrue(callback.failureCause instanceof StatusException);
1651     assertEquals(Status.Code.UNAVAILABLE,
1652         ((StatusException) callback.failureCause).getStatus().getCode());
1653 
1654     // now that handler is in terminal state, all future pings fail immediately
1655     callback = new PingCallbackImpl();
1656     clientTransport.ping(callback, MoreExecutors.directExecutor());
1657     assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
1658     assertEquals(1, callback.invocationCount);
1659     assertTrue(callback.failureCause instanceof StatusException);
1660     assertEquals(Status.Code.UNAVAILABLE,
1661         ((StatusException) callback.failureCause).getStatus().getCode());
1662     shutdownAndVerify();
1663   }
1664 
1665   @Test
shutdownDuringConnecting()1666   public void shutdownDuringConnecting() throws Exception {
1667     SettableFuture<Void> delayed = SettableFuture.create();
1668     Runnable connectingCallback = () -> Futures.getUnchecked(delayed);
1669     startTransport(
1670         DEFAULT_START_STREAM_ID,
1671         connectingCallback,
1672         false,
1673         null);
1674     clientTransport.shutdown(SHUTDOWN_REASON);
1675     delayed.set(null);
1676     shutdownAndVerify();
1677   }
1678 
1679   @Test
invalidAuthorityPropagates()1680   public void invalidAuthorityPropagates() {
1681     clientTransport = new OkHttpClientTransport(
1682         channelBuilder.buildTransportFactory(),
1683         new InetSocketAddress("localhost", 1234),
1684         "invalid_authority",
1685         "userAgent",
1686         EAG_ATTRS,
1687         NO_PROXY,
1688         tooManyPingsRunnable);
1689 
1690     String host = clientTransport.getOverridenHost();
1691     int port = clientTransport.getOverridenPort();
1692 
1693     assertEquals("invalid_authority", host);
1694     assertEquals(1234, port);
1695   }
1696 
1697   @Test
unreachableServer()1698   public void unreachableServer() throws Exception {
1699     clientTransport = new OkHttpClientTransport(
1700         channelBuilder.buildTransportFactory(),
1701         new InetSocketAddress("localhost", 0),
1702         "authority",
1703         "userAgent",
1704         EAG_ATTRS,
1705         NO_PROXY,
1706         tooManyPingsRunnable);
1707 
1708     ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
1709     clientTransport.start(listener);
1710     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
1711     verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture());
1712     Status status = captor.getValue();
1713     assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
1714     assertTrue(status.getCause().toString(), status.getCause() instanceof IOException);
1715 
1716     MockStreamListener streamListener = new MockStreamListener();
1717     clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers)
1718         .start(streamListener);
1719     streamListener.waitUntilStreamClosed();
1720     assertEquals(Status.UNAVAILABLE.getCode(), streamListener.status.getCode());
1721   }
1722 
1723   @Test
customSocketFactory()1724   public void customSocketFactory() throws Exception {
1725     RuntimeException exception = new RuntimeException("thrown by socket factory");
1726     SocketFactory socketFactory = new RuntimeExceptionThrowingSocketFactory(exception);
1727 
1728     clientTransport =
1729         new OkHttpClientTransport(
1730             channelBuilder.socketFactory(socketFactory).buildTransportFactory(),
1731             new InetSocketAddress("localhost", 0),
1732             "authority",
1733             "userAgent",
1734             EAG_ATTRS,
1735             NO_PROXY,
1736             tooManyPingsRunnable);
1737 
1738     ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
1739     clientTransport.start(listener);
1740     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
1741     verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture());
1742     Status status = captor.getValue();
1743     assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
1744     assertSame(exception, status.getCause());
1745   }
1746 
1747   @Test
proxy_200()1748   public void proxy_200() throws Exception {
1749     ServerSocket serverSocket = new ServerSocket(0);
1750     InetSocketAddress targetAddress = InetSocketAddress.createUnresolved("theservice", 80);
1751     clientTransport = new OkHttpClientTransport(
1752         channelBuilder.buildTransportFactory(),
1753         targetAddress,
1754         "authority",
1755         "userAgent",
1756         EAG_ATTRS,
1757         HttpConnectProxiedSocketAddress.newBuilder()
1758             .setTargetAddress(targetAddress)
1759             .setProxyAddress(new InetSocketAddress("localhost", serverSocket.getLocalPort()))
1760             .build(),
1761         tooManyPingsRunnable);
1762     clientTransport.start(transportListener);
1763 
1764     Socket sock = serverSocket.accept();
1765     serverSocket.close();
1766 
1767     BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), UTF_8));
1768     assertEquals("CONNECT theservice:80 HTTP/1.1", reader.readLine());
1769     assertEquals("Host: theservice:80", reader.readLine());
1770     while (!"".equals(reader.readLine())) {}
1771 
1772     sock.getOutputStream().write("HTTP/1.1 200 OK\r\nServer: test\r\n\r\n".getBytes(UTF_8));
1773     sock.getOutputStream().flush();
1774 
1775     assertEquals("PRI * HTTP/2.0", reader.readLine());
1776     assertEquals("", reader.readLine());
1777     assertEquals("SM", reader.readLine());
1778     assertEquals("", reader.readLine());
1779 
1780     // Empty SETTINGS
1781     sock.getOutputStream().write(new byte[] {0, 0, 0, 0, 0x4, 0});
1782     // GOAWAY
1783     sock.getOutputStream().write(new byte[] {
1784         0, 0, 0, 8, 0x7, 0,
1785         0, 0, 0, 0, // last stream id
1786         0, 0, 0, 0, // error code
1787     });
1788     sock.getOutputStream().flush();
1789 
1790     verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class));
1791     while (sock.getInputStream().read() != -1) {}
1792     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1793     sock.close();
1794   }
1795 
1796   @Test
proxy_500()1797   public void proxy_500() throws Exception {
1798     ServerSocket serverSocket = new ServerSocket(0);
1799     InetSocketAddress targetAddress = InetSocketAddress.createUnresolved("theservice", 80);
1800     clientTransport = new OkHttpClientTransport(
1801         channelBuilder.buildTransportFactory(),
1802         targetAddress,
1803         "authority",
1804         "userAgent",
1805         EAG_ATTRS,
1806         HttpConnectProxiedSocketAddress.newBuilder()
1807             .setTargetAddress(targetAddress)
1808             .setProxyAddress(new InetSocketAddress("localhost", serverSocket.getLocalPort()))
1809             .build(),
1810         tooManyPingsRunnable);
1811     clientTransport.start(transportListener);
1812 
1813     Socket sock = serverSocket.accept();
1814     serverSocket.close();
1815 
1816     BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), UTF_8));
1817     assertEquals("CONNECT theservice:80 HTTP/1.1", reader.readLine());
1818     assertEquals("Host: theservice:80", reader.readLine());
1819     while (!"".equals(reader.readLine())) {}
1820 
1821     final String errorText = "text describing error";
1822     sock.getOutputStream().write("HTTP/1.1 500 OH NO\r\n\r\n".getBytes(UTF_8));
1823     sock.getOutputStream().write(errorText.getBytes(UTF_8));
1824     sock.getOutputStream().flush();
1825     sock.shutdownOutput();
1826 
1827     assertEquals(-1, sock.getInputStream().read());
1828 
1829     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
1830     verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture());
1831     Status error = captor.getValue();
1832     assertTrue("Status didn't contain error code: " + captor.getValue(),
1833         error.getDescription().contains("500"));
1834     assertTrue("Status didn't contain error description: " + captor.getValue(),
1835         error.getDescription().contains("OH NO"));
1836     assertTrue("Status didn't contain error text: " + captor.getValue(),
1837         error.getDescription().contains(errorText));
1838     assertEquals("Not UNAVAILABLE: " + captor.getValue(),
1839         Status.UNAVAILABLE.getCode(), error.getCode());
1840     sock.close();
1841     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1842   }
1843 
1844   @Test
proxy_immediateServerClose()1845   public void proxy_immediateServerClose() throws Exception {
1846     ServerSocket serverSocket = new ServerSocket(0);
1847     InetSocketAddress targetAddress = InetSocketAddress.createUnresolved("theservice", 80);
1848     clientTransport = new OkHttpClientTransport(
1849         channelBuilder.buildTransportFactory(),
1850         targetAddress,
1851         "authority",
1852         "userAgent",
1853         EAG_ATTRS,
1854         HttpConnectProxiedSocketAddress.newBuilder()
1855             .setTargetAddress(targetAddress)
1856             .setProxyAddress(new InetSocketAddress("localhost", serverSocket.getLocalPort()))
1857             .build(),
1858         tooManyPingsRunnable);
1859     clientTransport.start(transportListener);
1860 
1861     Socket sock = serverSocket.accept();
1862     serverSocket.close();
1863     sock.close();
1864 
1865     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
1866     verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture());
1867     Status error = captor.getValue();
1868     assertTrue("Status didn't contain proxy: " + captor.getValue(),
1869         error.getDescription().contains("proxy"));
1870     assertEquals("Not UNAVAILABLE: " + captor.getValue(),
1871         Status.UNAVAILABLE.getCode(), error.getCode());
1872     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1873   }
1874 
1875   @Test
proxy_serverHangs()1876   public void proxy_serverHangs() throws Exception {
1877     ServerSocket serverSocket = new ServerSocket(0);
1878     InetSocketAddress targetAddress = InetSocketAddress.createUnresolved("theservice", 80);
1879     clientTransport = new OkHttpClientTransport(
1880         channelBuilder.buildTransportFactory(),
1881         targetAddress,
1882         "authority",
1883         "userAgent",
1884         EAG_ATTRS,
1885         HttpConnectProxiedSocketAddress.newBuilder()
1886             .setTargetAddress(targetAddress)
1887             .setProxyAddress(new InetSocketAddress("localhost", serverSocket.getLocalPort()))
1888             .build(),
1889         tooManyPingsRunnable);
1890     clientTransport.proxySocketTimeout = 10;
1891     clientTransport.start(transportListener);
1892 
1893     Socket sock = serverSocket.accept();
1894     serverSocket.close();
1895 
1896     BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), UTF_8));
1897     assertEquals("CONNECT theservice:80 HTTP/1.1", reader.readLine());
1898     assertEquals("Host: theservice:80", reader.readLine());
1899     while (!"".equals(reader.readLine())) {}
1900 
1901     verify(transportListener, timeout(200)).transportShutdown(any(Status.class));
1902     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1903     sock.close();
1904   }
1905 
1906   @Test
goAway_notUtf8()1907   public void goAway_notUtf8() throws Exception {
1908     initTransport();
1909     // 0xFF is never permitted in UTF-8. 0xF0 should have 3 continuations following, and 0x0a isn't
1910     // a continuation.
1911     frameHandler().goAway(
1912         0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.of((byte) 0xFF, (byte) 0xF0, (byte) 0x0a));
1913 
1914     shutdownAndVerify();
1915   }
1916 
1917   @Test
goAway_notTooManyPings()1918   public void goAway_notTooManyPings() throws Exception {
1919     final AtomicBoolean run = new AtomicBoolean();
1920     tooManyPingsRunnable = new Runnable() {
1921       @Override
1922       public void run() {
1923         run.set(true);
1924       }
1925     };
1926     initTransport();
1927     frameHandler().goAway(0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.encodeUtf8("not_many_pings"));
1928     assertFalse(run.get());
1929 
1930     shutdownAndVerify();
1931   }
1932 
1933   @Test
goAway_tooManyPings()1934   public void goAway_tooManyPings() throws Exception {
1935     final AtomicBoolean run = new AtomicBoolean();
1936     tooManyPingsRunnable = new Runnable() {
1937       @Override
1938       public void run() {
1939         run.set(true);
1940       }
1941     };
1942     initTransport();
1943     frameHandler().goAway(0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.encodeUtf8("too_many_pings"));
1944     assertTrue(run.get());
1945 
1946     shutdownAndVerify();
1947   }
1948 
1949   @Test
goAway_streamListenerRpcProgress()1950   public void goAway_streamListenerRpcProgress() throws Exception {
1951     initTransport();
1952     setMaxConcurrentStreams(2);
1953     MockStreamListener listener1 = new MockStreamListener();
1954     MockStreamListener listener2 = new MockStreamListener();
1955     MockStreamListener listener3 = new MockStreamListener();
1956     OkHttpClientStream stream1 =
1957         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1958     stream1.start(listener1);
1959     OkHttpClientStream stream2 =
1960         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1961     stream2.start(listener2);
1962     OkHttpClientStream stream3 =
1963         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1964     stream3.start(listener3);
1965     waitForStreamPending(1);
1966 
1967     assertEquals(2, activeStreamCount());
1968     assertContainStream(DEFAULT_START_STREAM_ID);
1969     assertContainStream(DEFAULT_START_STREAM_ID + 2);
1970 
1971     frameHandler()
1972         .goAway(DEFAULT_START_STREAM_ID, ErrorCode.CANCEL, ByteString.encodeUtf8("blablabla"));
1973 
1974     listener2.waitUntilStreamClosed();
1975     listener3.waitUntilStreamClosed();
1976     assertNull(listener1.rpcProgress);
1977     assertEquals(REFUSED, listener2.rpcProgress);
1978     assertEquals(MISCARRIED, listener3.rpcProgress);
1979     assertEquals(1, activeStreamCount());
1980     assertContainStream(DEFAULT_START_STREAM_ID);
1981 
1982     getStream(DEFAULT_START_STREAM_ID).cancel(Status.CANCELLED);
1983 
1984     listener1.waitUntilStreamClosed();
1985     assertEquals(PROCESSED, listener1.rpcProgress);
1986 
1987     shutdownAndVerify();
1988   }
1989 
1990   @Test
reset_streamListenerRpcProgress()1991   public void reset_streamListenerRpcProgress() throws Exception {
1992     initTransport();
1993     MockStreamListener listener1 = new MockStreamListener();
1994     MockStreamListener listener2 = new MockStreamListener();
1995     MockStreamListener listener3 = new MockStreamListener();
1996     OkHttpClientStream stream1 =
1997         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
1998     stream1.start(listener1);
1999     OkHttpClientStream stream2 =
2000         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
2001     stream2.start(listener2);
2002     OkHttpClientStream stream3 =
2003         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
2004     stream3.start(listener3);
2005 
2006     assertEquals(3, activeStreamCount());
2007     assertContainStream(DEFAULT_START_STREAM_ID);
2008     assertContainStream(DEFAULT_START_STREAM_ID + 2);
2009     assertContainStream(DEFAULT_START_STREAM_ID + 4);
2010 
2011     frameHandler().rstStream(DEFAULT_START_STREAM_ID + 2, ErrorCode.REFUSED_STREAM);
2012 
2013     listener2.waitUntilStreamClosed();
2014     assertNull(listener1.rpcProgress);
2015     assertEquals(REFUSED, listener2.rpcProgress);
2016     assertNull(listener3.rpcProgress);
2017 
2018     frameHandler().rstStream(DEFAULT_START_STREAM_ID, ErrorCode.CANCEL);
2019     listener1.waitUntilStreamClosed();
2020     assertEquals(PROCESSED, listener1.rpcProgress);
2021     assertNull(listener3.rpcProgress);
2022 
2023     getStream(DEFAULT_START_STREAM_ID + 4).cancel(Status.CANCELLED);
2024 
2025     listener3.waitUntilStreamClosed();
2026     assertEquals(PROCESSED, listener3.rpcProgress);
2027 
2028     shutdownAndVerify();
2029   }
2030 
2031   @Test
shutdownNow_streamListenerRpcProgress()2032   public void shutdownNow_streamListenerRpcProgress() throws Exception {
2033     initTransport();
2034     setMaxConcurrentStreams(2);
2035     MockStreamListener listener1 = new MockStreamListener();
2036     MockStreamListener listener2 = new MockStreamListener();
2037     MockStreamListener listener3 = new MockStreamListener();
2038     OkHttpClientStream stream1 =
2039         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
2040     stream1.start(listener1);
2041     OkHttpClientStream stream2 =
2042         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
2043     stream2.start(listener2);
2044     OkHttpClientStream stream3 =
2045         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
2046     stream3.start(listener3);
2047     waitForStreamPending(1);
2048 
2049     assertEquals(2, activeStreamCount());
2050     assertContainStream(DEFAULT_START_STREAM_ID);
2051     assertContainStream(DEFAULT_START_STREAM_ID + 2);
2052 
2053     clientTransport.shutdownNow(Status.INTERNAL);
2054 
2055     listener1.waitUntilStreamClosed();
2056     listener2.waitUntilStreamClosed();
2057     listener3.waitUntilStreamClosed();
2058 
2059     assertEquals(PROCESSED, listener1.rpcProgress);
2060     assertEquals(PROCESSED, listener2.rpcProgress);
2061     assertEquals(MISCARRIED, listener3.rpcProgress);
2062   }
2063 
activeStreamCount()2064   private int activeStreamCount() {
2065     return clientTransport.getActiveStreams().length;
2066   }
2067 
getStream(int streamId)2068   private OkHttpClientStream getStream(int streamId) {
2069     return clientTransport.getStream(streamId);
2070   }
2071 
assertContainStream(int streamId)2072   void assertContainStream(int streamId) {
2073     assertNotNull(clientTransport.getStream(streamId));
2074   }
2075 
frameHandler()2076   private ClientFrameHandler frameHandler() throws Exception {
2077     return clientTransport.getHandler();
2078   }
2079 
waitForStreamPending(int expected)2080   private void waitForStreamPending(int expected) throws Exception {
2081     int duration = TIME_OUT_MS / 10;
2082     for (int i = 0; i < 10; i++) {
2083       if (clientTransport.getPendingStreamSize() == expected) {
2084         return;
2085       }
2086       Thread.sleep(duration);
2087     }
2088     assertEquals(expected, clientTransport.getPendingStreamSize());
2089   }
2090 
assertNewStreamFail()2091   private void assertNewStreamFail() throws Exception {
2092     MockStreamListener listener = new MockStreamListener();
2093     OkHttpClientStream stream =
2094         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
2095     stream.start(listener);
2096     listener.waitUntilStreamClosed();
2097     assertFalse(listener.status.isOk());
2098   }
2099 
setMaxConcurrentStreams(int num)2100   private void setMaxConcurrentStreams(int num) throws Exception {
2101     Settings settings = new Settings();
2102     OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS, num);
2103     frameHandler().settings(false, settings);
2104   }
2105 
setInitialWindowSize(int size)2106   private void setInitialWindowSize(int size) throws Exception {
2107     Settings settings = new Settings();
2108     OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, size);
2109     frameHandler().settings(false, settings);
2110   }
2111 
createMessageFrame(String message)2112   private static Buffer createMessageFrame(String message) {
2113     return createMessageFrame(message.getBytes(UTF_8));
2114   }
2115 
createMessageFrame(byte[] message)2116   private static Buffer createMessageFrame(byte[] message) {
2117     Buffer buffer = new Buffer();
2118     buffer.writeByte(0 /* UNCOMPRESSED */);
2119     buffer.writeInt(message.length);
2120     buffer.write(message);
2121     return buffer;
2122   }
2123 
grpcResponseHeaders()2124   private List<Header> grpcResponseHeaders() {
2125     return ImmutableList.of(
2126         new Header(":status", "200"),
2127         CONTENT_TYPE_HEADER);
2128   }
2129 
grpcResponseTrailers()2130   private List<Header> grpcResponseTrailers() {
2131     return ImmutableList.of(
2132         new Header(InternalStatus.CODE_KEY.name(), "0"),
2133         // Adding Content-Type and :status for testing responses with only a single HEADERS frame.
2134         new Header(":status", "200"),
2135         CONTENT_TYPE_HEADER);
2136   }
2137 
anyListHeader()2138   private static List<Header> anyListHeader() {
2139     return any();
2140   }
2141 
2142   private static class MockFrameReader implements FrameReader {
2143     final CountDownLatch closed = new CountDownLatch(1);
2144 
2145     enum Result {
2146       THROW_EXCEPTION,
2147       RETURN_FALSE,
2148       THROW_ERROR
2149     }
2150 
2151     final LinkedBlockingQueue<Result> nextResults = new LinkedBlockingQueue<>();
2152 
2153     @Override
close()2154     public void close() throws IOException {
2155       closed.countDown();
2156     }
2157 
assertClosed()2158     void assertClosed() {
2159       try {
2160         if (!closed.await(TIME_OUT_MS, TimeUnit.MILLISECONDS)) {
2161           fail("Failed waiting frame reader to be closed.");
2162         }
2163       } catch (InterruptedException e) {
2164         Thread.currentThread().interrupt();
2165         fail("Interrupted while waiting for frame reader to be closed.");
2166       }
2167     }
2168 
2169     // The wait is safe; nextFrame is called in a loop and can have spurious wakeups
2170     @SuppressWarnings("WaitNotInLoop")
2171     @Override
nextFrame(FrameReader.Handler handler)2172     public boolean nextFrame(FrameReader.Handler handler) throws IOException {
2173       Result result;
2174       try {
2175         result = nextResults.take();
2176       } catch (InterruptedException e) {
2177         Thread.currentThread().interrupt();
2178         throw new IOException(e);
2179       }
2180       switch (result) {
2181         case THROW_EXCEPTION:
2182           throw new IOException(NETWORK_ISSUE_MESSAGE);
2183         case RETURN_FALSE:
2184           return false;
2185         case THROW_ERROR:
2186           throw new Error(ERROR_MESSAGE);
2187         default:
2188           throw new UnsupportedOperationException("unimplemented: " + result);
2189       }
2190     }
2191 
throwIoExceptionForNextFrame()2192     void throwIoExceptionForNextFrame() {
2193       nextResults.add(Result.THROW_EXCEPTION);
2194     }
2195 
throwErrorForNextFrame()2196     void throwErrorForNextFrame() {
2197       nextResults.add(Result.THROW_ERROR);
2198     }
2199 
nextFrameAtEndOfStream()2200     void nextFrameAtEndOfStream() {
2201       nextResults.add(Result.RETURN_FALSE);
2202     }
2203 
2204     @Override
readConnectionPreface()2205     public void readConnectionPreface() throws IOException {
2206       // not used.
2207     }
2208   }
2209 
2210   private static class MockStreamListener implements ClientStreamListener {
2211     Status status;
2212     Metadata headers;
2213     Metadata trailers;
2214     RpcProgress rpcProgress;
2215     CountDownLatch closed = new CountDownLatch(1);
2216     ArrayList<String> messages = new ArrayList<>();
2217     boolean onReadyCalled;
2218 
MockStreamListener()2219     MockStreamListener() {
2220     }
2221 
2222     @Override
headersRead(Metadata headers)2223     public void headersRead(Metadata headers) {
2224       this.headers = headers;
2225     }
2226 
2227     @Override
messagesAvailable(MessageProducer producer)2228     public void messagesAvailable(MessageProducer producer) {
2229       InputStream inputStream;
2230       while ((inputStream = producer.next()) != null) {
2231         String msg = getContent(inputStream);
2232         if (msg != null) {
2233           messages.add(msg);
2234         }
2235       }
2236     }
2237 
2238     @Override
closed(Status status, RpcProgress rpcProgress, Metadata trailers)2239     public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
2240       this.status = status;
2241       this.trailers = trailers;
2242       this.rpcProgress = rpcProgress;
2243       closed.countDown();
2244     }
2245 
2246     @Override
onReady()2247     public void onReady() {
2248       onReadyCalled = true;
2249     }
2250 
isOnReadyCalled()2251     boolean isOnReadyCalled() {
2252       boolean value = onReadyCalled;
2253       onReadyCalled = false;
2254       return value;
2255     }
2256 
waitUntilStreamClosed()2257     void waitUntilStreamClosed() throws InterruptedException {
2258       if (!closed.await(TIME_OUT_MS, TimeUnit.MILLISECONDS)) {
2259         fail("Failed waiting stream to be closed.");
2260       }
2261     }
2262 
2263     @SuppressWarnings("Finally") // We don't care about suppressed exceptions in the test
getContent(InputStream message)2264     static String getContent(InputStream message) {
2265       BufferedReader br = new BufferedReader(new InputStreamReader(message, UTF_8));
2266       try {
2267         // Only one line message is used in this test.
2268         return br.readLine();
2269       } catch (IOException e) {
2270         return null;
2271       } finally {
2272         try {
2273           message.close();
2274         } catch (IOException e) {
2275           // Ignore
2276         }
2277       }
2278     }
2279   }
2280 
2281   private static class MockSocket extends Socket {
2282     final MockFrameReader frameReader;
2283     private final PipedOutputStream outputStream = new PipedOutputStream();
2284     private final PipedInputStream outputStreamSink = new PipedInputStream();
2285     private final PipedOutputStream inputStreamSource = new PipedOutputStream();
2286     private final PipedInputStream inputStream = new PipedInputStream();
2287 
MockSocket(MockFrameReader frameReader)2288     MockSocket(MockFrameReader frameReader) {
2289       this.frameReader = frameReader;
2290       try {
2291         outputStreamSink.connect(outputStream);
2292         inputStream.connect(inputStreamSource);
2293       } catch (IOException ex) {
2294         throw new AssertionError(ex);
2295       }
2296     }
2297 
2298     @Override
close()2299     public synchronized void close() {
2300       frameReader.nextFrameAtEndOfStream();
2301     }
2302 
2303     @Override
getLocalSocketAddress()2304     public SocketAddress getLocalSocketAddress() {
2305       return InetSocketAddress.createUnresolved("localhost", 4000);
2306     }
2307 
2308     @Override
getOutputStream()2309     public OutputStream getOutputStream() {
2310       return outputStream;
2311     }
2312 
2313     @Override
getInputStream()2314     public InputStream getInputStream() {
2315       return inputStream;
2316     }
2317   }
2318 
2319   static class PingCallbackImpl implements ClientTransport.PingCallback {
2320     int invocationCount;
2321     long roundTripTime;
2322     Throwable failureCause;
2323 
2324     @Override
onSuccess(long roundTripTimeNanos)2325     public void onSuccess(long roundTripTimeNanos) {
2326       invocationCount++;
2327       this.roundTripTime = roundTripTimeNanos;
2328     }
2329 
2330     @Override
onFailure(Throwable cause)2331     public void onFailure(Throwable cause) {
2332       invocationCount++;
2333       this.failureCause = cause;
2334     }
2335   }
2336 
shutdownAndVerify()2337   private void shutdownAndVerify() {
2338     clientTransport.shutdown(SHUTDOWN_REASON);
2339     assertEquals(0, activeStreamCount());
2340     try {
2341       verify(frameWriter, timeout(TIME_OUT_MS)).close();
2342     } catch (IOException e) {
2343       throw new RuntimeException(e);
2344     }
2345     frameReader.assertClosed();
2346   }
2347 
getTransportStats(InternalInstrumented<SocketStats> obj)2348   private static TransportStats getTransportStats(InternalInstrumented<SocketStats> obj)
2349       throws ExecutionException, InterruptedException {
2350     return obj.getStats().get().data;
2351   }
2352 
2353   /** A FrameWriter to mock with CALL_REAL_METHODS option. */
2354   private static class MockFrameWriter implements FrameWriter {
2355 
2356     private Socket socket;
2357     private final Queue<Buffer> capturedBuffer;
2358 
MockFrameWriter(Socket socket, Queue<Buffer> capturedBuffer)2359     public MockFrameWriter(Socket socket, Queue<Buffer> capturedBuffer) {
2360       // Sets a socket to close. Some tests assumes that FrameWriter will close underlying sink
2361       // which will eventually close the socket.
2362       this.socket = socket;
2363       this.capturedBuffer = capturedBuffer;
2364     }
2365 
2366     @Override
close()2367     public void close() throws IOException {
2368       socket.close();
2369     }
2370 
2371     @Override
maxDataLength()2372     public int maxDataLength() {
2373       return Integer.MAX_VALUE;
2374     }
2375 
2376     @Override
data(boolean outFinished, int streamId, Buffer source, int byteCount)2377     public void data(boolean outFinished, int streamId, Buffer source, int byteCount)
2378         throws IOException {
2379       // simulate the side effect, and captures to internal queue.
2380       Buffer capture = new Buffer();
2381       capture.write(source, byteCount);
2382       capturedBuffer.add(capture);
2383     }
2384 
2385     // rest of methods are unimplemented
2386 
2387     @Override
connectionPreface()2388     public void connectionPreface() throws IOException {}
2389 
2390     @Override
ackSettings(Settings peerSettings)2391     public void ackSettings(Settings peerSettings) throws IOException {}
2392 
2393     @Override
pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)2394     public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
2395         throws IOException {}
2396 
2397     @Override
flush()2398     public void flush() throws IOException {}
2399 
2400     @Override
synStream(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock)2401     public void synStream(boolean outFinished, boolean inFinished, int streamId,
2402         int associatedStreamId, List<Header> headerBlock) throws IOException {}
2403 
2404     @Override
synReply(boolean outFinished, int streamId, List<Header> headerBlock)2405     public void synReply(boolean outFinished, int streamId, List<Header> headerBlock)
2406         throws IOException {}
2407 
2408     @Override
headers(int streamId, List<Header> headerBlock)2409     public void headers(int streamId, List<Header> headerBlock) throws IOException {}
2410 
2411     @Override
rstStream(int streamId, ErrorCode errorCode)2412     public void rstStream(int streamId, ErrorCode errorCode) throws IOException {}
2413 
2414     @Override
settings(Settings okHttpSettings)2415     public void settings(Settings okHttpSettings) throws IOException {}
2416 
2417     @Override
ping(boolean ack, int payload1, int payload2)2418     public void ping(boolean ack, int payload1, int payload2) throws IOException {}
2419 
2420     @Override
goAway(int lastGoodStreamId, ErrorCode errorCode, byte[] debugData)2421     public void goAway(int lastGoodStreamId, ErrorCode errorCode, byte[] debugData)
2422         throws IOException {}
2423 
2424     @Override
windowUpdate(int streamId, long windowSizeIncrement)2425     public void windowUpdate(int streamId, long windowSizeIncrement) throws IOException {}
2426   }
2427 
2428   private static class RuntimeExceptionThrowingSocketFactory extends SocketFactory {
2429     RuntimeException exception;
2430 
RuntimeExceptionThrowingSocketFactory(RuntimeException exception)2431     private RuntimeExceptionThrowingSocketFactory(RuntimeException exception) {
2432       this.exception = exception;
2433     }
2434 
2435     @Override
createSocket(String s, int i)2436     public Socket createSocket(String s, int i) {
2437       throw exception;
2438     }
2439 
2440     @Override
createSocket(String s, int i, InetAddress inetAddress, int i1)2441     public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) {
2442       throw exception;
2443     }
2444 
2445     @Override
createSocket(InetAddress inetAddress, int i)2446     public Socket createSocket(InetAddress inetAddress, int i) {
2447       throw exception;
2448     }
2449 
2450     @Override
createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1)2451     public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) {
2452       throw exception;
2453     }
2454   }
2455 
2456   static class FakeSocketFactory extends SocketFactory {
2457     private Socket socket;
2458 
FakeSocketFactory(Socket socket)2459     public FakeSocketFactory(Socket socket) {
2460       this.socket = Preconditions.checkNotNull(socket, "socket");
2461     }
2462 
createSocket()2463     @Override public Socket createSocket() {
2464       Preconditions.checkNotNull(this.socket, "socket");
2465       Socket socket = this.socket;
2466       this.socket = null;
2467       return socket;
2468     }
2469 
createSocket(InetAddress host, int port)2470     @Override public Socket createSocket(InetAddress host, int port) {
2471       return createSocket();
2472     }
2473 
createSocket( InetAddress host, int port, InetAddress localAddress, int localPort)2474     @Override public Socket createSocket(
2475         InetAddress host, int port, InetAddress localAddress, int localPort) {
2476       return createSocket();
2477     }
2478 
createSocket(String host, int port)2479     @Override public Socket createSocket(String host, int port) {
2480       return createSocket();
2481     }
2482 
createSocket( String host, int port, InetAddress localHost, int localPort)2483     @Override public Socket createSocket(
2484         String host, int port, InetAddress localHost, int localPort) {
2485       return createSocket();
2486     }
2487   }
2488 
2489   static class FakeVariant implements Variant {
2490     private FrameReader frameReader;
2491     private FrameWriter frameWriter;
2492 
FakeVariant(FrameReader frameReader, FrameWriter frameWriter)2493     public FakeVariant(FrameReader frameReader, FrameWriter frameWriter) {
2494       this.frameReader = frameReader;
2495       this.frameWriter = frameWriter;
2496     }
2497 
getProtocol()2498     @Override public Protocol getProtocol() {
2499       return Protocol.HTTP_2;
2500     }
2501 
newReader(BufferedSource source, boolean client)2502     @Override public FrameReader newReader(BufferedSource source, boolean client) {
2503       Preconditions.checkNotNull(this.frameReader, "frameReader");
2504       FrameReader frameReader = this.frameReader;
2505       this.frameReader = null;
2506       return frameReader;
2507     }
2508 
newWriter(BufferedSink sink, boolean client)2509     @Override public FrameWriter newWriter(BufferedSink sink, boolean client) {
2510       Preconditions.checkNotNull(this.frameWriter, "frameWriter");
2511       FrameWriter frameWriter = this.frameWriter;
2512       this.frameWriter = null;
2513       return frameWriter;
2514     }
2515   }
2516 }
2517