1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 package org.chromium.net;
6 
7 import static com.google.common.truth.Truth.assertThat;
8 
9 import static org.junit.Assert.fail;
10 
11 import android.os.ConditionVariable;
12 
13 import java.nio.ByteBuffer;
14 import java.util.ArrayList;
15 import java.util.Iterator;
16 import java.util.concurrent.Executor;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.ThreadFactory;
20 
21 /**
22  * Callback that tracks information from different callbacks and and has a
23  * method to block thread until the stream completes on another thread.
24  * Allows to cancel, block stream or throw an exception from an arbitrary step.
25  */
26 public class TestBidirectionalStreamCallback extends BidirectionalStream.Callback {
27     private UrlResponseInfo mResponseInfo;
28     public CronetException mError;
29 
30     public ResponseStep mResponseStep = ResponseStep.NOTHING;
31 
32     public boolean mOnErrorCalled;
33     public boolean mOnCanceledCalled;
34 
35     public int mHttpResponseDataLength;
36     public String mResponseAsString = "";
37 
38     public UrlResponseInfo.HeaderBlock mTrailers;
39 
40     private static final int READ_BUFFER_SIZE = 32 * 1024;
41 
42     // When false, the consumer is responsible for all calls into the stream
43     // that advance it.
44     private boolean mAutoAdvance = true;
45 
46     // The executor thread will block on this after reaching a terminal method.
47     // Terminal methods are (onSucceeded, onFailed or onCancelled)
48     private ConditionVariable mBlockOnTerminalState = new ConditionVariable(true);
49 
50     // Conditionally fail on certain steps.
51     private FailureType mFailureType = FailureType.NONE;
52     private ResponseStep mFailureStep = ResponseStep.NOTHING;
53 
54     // Signals when the stream is done either successfully or not.
55     private final ConditionVariable mDone = new ConditionVariable();
56 
57     // Signaled on each step when mAutoAdvance is false.
58     private final ConditionVariable mReadStepBlock = new ConditionVariable();
59     private final ConditionVariable mWriteStepBlock = new ConditionVariable();
60 
61     // Executor Service for Cronet callbacks.
62     private final ExecutorService mExecutorService =
63             Executors.newSingleThreadExecutor(new ExecutorThreadFactory());
64     private Thread mExecutorThread;
65 
66     // position() of ByteBuffer prior to read() call.
67     private int mBufferPositionBeforeRead;
68 
69     // Data to write.
70     private final ArrayList<WriteBuffer> mWriteBuffers = new ArrayList<WriteBuffer>();
71 
72     // Buffers that we yet to receive the corresponding onWriteCompleted callback.
73     private final ArrayList<WriteBuffer> mWriteBuffersToBeAcked = new ArrayList<WriteBuffer>();
74 
75     // Whether to use a direct executor.
76     private final boolean mUseDirectExecutor;
77     private final DirectExecutor mDirectExecutor;
78 
79     private class ExecutorThreadFactory implements ThreadFactory {
80         @Override
newThread(Runnable r)81         public Thread newThread(Runnable r) {
82             mExecutorThread = new Thread(r);
83             return mExecutorThread;
84         }
85     }
86 
87     private static class WriteBuffer {
88         final ByteBuffer mBuffer;
89         final boolean mFlush;
90 
WriteBuffer(ByteBuffer buffer, boolean flush)91         public WriteBuffer(ByteBuffer buffer, boolean flush) {
92             mBuffer = buffer;
93             mFlush = flush;
94         }
95     }
96 
97     private static class DirectExecutor implements Executor {
98         @Override
execute(Runnable task)99         public void execute(Runnable task) {
100             task.run();
101         }
102     }
103 
104     public enum ResponseStep {
105         NOTHING,
106         ON_STREAM_READY,
107         ON_RESPONSE_STARTED,
108         ON_READ_COMPLETED,
109         ON_WRITE_COMPLETED,
110         ON_TRAILERS,
111         ON_CANCELED,
112         ON_FAILED,
113         ON_SUCCEEDED,
114     }
115 
116     public enum FailureType {
117         NONE,
118         CANCEL_SYNC,
119         CANCEL_ASYNC,
120         // Same as above, but continues to advance the stream after posting
121         // the cancellation task.
122         CANCEL_ASYNC_WITHOUT_PAUSE,
123         THROW_SYNC
124     }
125 
TestBidirectionalStreamCallback()126     public TestBidirectionalStreamCallback() {
127         mUseDirectExecutor = false;
128         mDirectExecutor = null;
129     }
130 
TestBidirectionalStreamCallback(boolean useDirectExecutor)131     public TestBidirectionalStreamCallback(boolean useDirectExecutor) {
132         mUseDirectExecutor = useDirectExecutor;
133         mDirectExecutor = new DirectExecutor();
134     }
135 
136     /**
137      * This blocks the callback executor thread once it has reached a final state callback.
138      * In order to continue execution, this method must be called again and providing {@code false}
139      * to continue execution.
140      * @param blockOnTerminalState the state to set for the executor thread
141      */
setBlockOnTerminalState(boolean blockOnTerminalState)142     public void setBlockOnTerminalState(boolean blockOnTerminalState) {
143         if (blockOnTerminalState) {
144             mBlockOnTerminalState.close();
145         } else {
146             mBlockOnTerminalState.open();
147         }
148     }
149 
setAutoAdvance(boolean autoAdvance)150     public void setAutoAdvance(boolean autoAdvance) {
151         mAutoAdvance = autoAdvance;
152     }
153 
setFailure(FailureType failureType, ResponseStep failureStep)154     public void setFailure(FailureType failureType, ResponseStep failureStep) {
155         mFailureStep = failureStep;
156         mFailureType = failureType;
157     }
158 
blockForDone()159     public void blockForDone() {
160         mDone.block();
161     }
162 
waitForNextReadStep()163     public void waitForNextReadStep() {
164         mReadStepBlock.block();
165         mReadStepBlock.close();
166     }
167 
waitForNextWriteStep()168     public void waitForNextWriteStep() {
169         mWriteStepBlock.block();
170         mWriteStepBlock.close();
171     }
172 
getExecutor()173     public Executor getExecutor() {
174         if (mUseDirectExecutor) {
175             return mDirectExecutor;
176         }
177         return mExecutorService;
178     }
179 
shutdownExecutor()180     public void shutdownExecutor() {
181         if (mUseDirectExecutor) {
182             throw new UnsupportedOperationException("DirectExecutor doesn't support shutdown");
183         }
184         mExecutorService.shutdown();
185     }
186 
addWriteData(byte[] data)187     public void addWriteData(byte[] data) {
188         addWriteData(data, true);
189     }
190 
addWriteData(byte[] data, boolean flush)191     public void addWriteData(byte[] data, boolean flush) {
192         ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length);
193         writeBuffer.put(data);
194         writeBuffer.flip();
195         mWriteBuffers.add(new WriteBuffer(writeBuffer, flush));
196         mWriteBuffersToBeAcked.add(new WriteBuffer(writeBuffer, flush));
197     }
198 
199     @Override
onStreamReady(BidirectionalStream stream)200     public void onStreamReady(BidirectionalStream stream) {
201         checkOnValidThread();
202         assertThat(stream.isDone()).isFalse();
203         assertThat(mResponseStep).isEqualTo(ResponseStep.NOTHING);
204         assertThat(mError).isNull();
205         mResponseStep = ResponseStep.ON_STREAM_READY;
206         if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
207             return;
208         }
209         startNextWrite(stream);
210     }
211 
212     @Override
onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)213     public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) {
214         checkOnValidThread();
215         assertThat(stream.isDone()).isFalse();
216         assertThat(mResponseStep)
217                 .isAnyOf(
218                         ResponseStep.NOTHING,
219                         ResponseStep.ON_STREAM_READY,
220                         ResponseStep.ON_WRITE_COMPLETED);
221         assertThat(mError).isNull();
222 
223         mResponseStep = ResponseStep.ON_RESPONSE_STARTED;
224         mResponseInfo = info;
225         if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
226             return;
227         }
228         startNextRead(stream);
229     }
230 
231     @Override
onReadCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer byteBuffer, boolean endOfStream)232     public void onReadCompleted(
233             BidirectionalStream stream,
234             UrlResponseInfo info,
235             ByteBuffer byteBuffer,
236             boolean endOfStream) {
237         checkOnValidThread();
238         assertThat(stream.isDone()).isFalse();
239         assertThat(mResponseStep)
240                 .isAnyOf(
241                         ResponseStep.ON_RESPONSE_STARTED,
242                         ResponseStep.ON_READ_COMPLETED,
243                         ResponseStep.ON_WRITE_COMPLETED,
244                         ResponseStep.ON_TRAILERS);
245         assertThat(mError).isNull();
246 
247         mResponseStep = ResponseStep.ON_READ_COMPLETED;
248         mResponseInfo = info;
249 
250         final int bytesRead = byteBuffer.position() - mBufferPositionBeforeRead;
251         mHttpResponseDataLength += bytesRead;
252         final byte[] lastDataReceivedAsBytes = new byte[bytesRead];
253         // Rewind byteBuffer.position() to pre-read() position.
254         byteBuffer.position(mBufferPositionBeforeRead);
255         // This restores byteBuffer.position() to its value on entrance to
256         // this function.
257         byteBuffer.get(lastDataReceivedAsBytes);
258 
259         mResponseAsString += new String(lastDataReceivedAsBytes);
260 
261         if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
262             return;
263         }
264         // Do not read if EOF has been reached.
265         if (!endOfStream) {
266             startNextRead(stream);
267         }
268     }
269 
270     @Override
onWriteCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)271     public void onWriteCompleted(
272             BidirectionalStream stream,
273             UrlResponseInfo info,
274             ByteBuffer buffer,
275             boolean endOfStream) {
276         checkOnValidThread();
277         assertThat(stream.isDone()).isFalse();
278         assertThat(mError).isNull();
279         mResponseStep = ResponseStep.ON_WRITE_COMPLETED;
280         mResponseInfo = info;
281         if (!mWriteBuffersToBeAcked.isEmpty()) {
282             assertThat(mWriteBuffersToBeAcked.get(0).mBuffer).isEqualTo(buffer);
283             mWriteBuffersToBeAcked.remove(0);
284         }
285         if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
286             return;
287         }
288         startNextWrite(stream);
289     }
290 
291     @Override
onResponseTrailersReceived( BidirectionalStream stream, UrlResponseInfo info, UrlResponseInfo.HeaderBlock trailers)292     public void onResponseTrailersReceived(
293             BidirectionalStream stream,
294             UrlResponseInfo info,
295             UrlResponseInfo.HeaderBlock trailers) {
296         checkOnValidThread();
297         assertThat(stream.isDone()).isFalse();
298         assertThat(mError).isNull();
299         mResponseStep = ResponseStep.ON_TRAILERS;
300         mResponseInfo = info;
301         mTrailers = trailers;
302         if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
303             return;
304         }
305     }
306 
307     @Override
onSucceeded(BidirectionalStream stream, UrlResponseInfo info)308     public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
309         checkOnValidThread();
310         assertThat(stream.isDone()).isTrue();
311         assertThat(mResponseStep)
312                 .isAnyOf(
313                         ResponseStep.ON_RESPONSE_STARTED,
314                         ResponseStep.ON_READ_COMPLETED,
315                         ResponseStep.ON_WRITE_COMPLETED,
316                         ResponseStep.ON_TRAILERS);
317         assertThat(mOnErrorCalled).isFalse();
318         assertThat(mOnCanceledCalled).isFalse();
319         assertThat(mError).isNull();
320         assertThat(mWriteBuffers).isEmpty();
321         assertThat(mWriteBuffersToBeAcked).isEmpty();
322 
323         mResponseStep = ResponseStep.ON_SUCCEEDED;
324         mResponseInfo = info;
325         openDone();
326         mBlockOnTerminalState.block();
327         maybeThrowCancelOrPause(stream, mReadStepBlock);
328     }
329 
330     @Override
onFailed(BidirectionalStream stream, UrlResponseInfo info, CronetException error)331     public void onFailed(BidirectionalStream stream, UrlResponseInfo info, CronetException error) {
332         checkOnValidThread();
333         assertThat(stream.isDone()).isTrue();
334         // Shouldn't happen after success.
335         assertThat(mResponseStep).isNotEqualTo(ResponseStep.ON_SUCCEEDED);
336         // Should happen at most once for a single stream.
337         assertThat(mOnErrorCalled).isFalse();
338         assertThat(mOnCanceledCalled).isFalse();
339         assertThat(mError).isNull();
340         mResponseStep = ResponseStep.ON_FAILED;
341         mResponseInfo = info;
342 
343         mOnErrorCalled = true;
344         mError = error;
345         openDone();
346         mBlockOnTerminalState.block();
347         maybeThrowCancelOrPause(stream, mReadStepBlock);
348     }
349 
350     @Override
onCanceled(BidirectionalStream stream, UrlResponseInfo info)351     public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) {
352         checkOnValidThread();
353         assertThat(stream.isDone()).isTrue();
354         // Should happen at most once for a single stream.
355         assertThat(mOnCanceledCalled).isFalse();
356         assertThat(mOnErrorCalled).isFalse();
357         assertThat(mError).isNull();
358         mResponseStep = ResponseStep.ON_CANCELED;
359         mResponseInfo = info;
360 
361         mOnCanceledCalled = true;
362         openDone();
363         mBlockOnTerminalState.block();
364         maybeThrowCancelOrPause(stream, mReadStepBlock);
365     }
366 
startNextRead(BidirectionalStream stream)367     public void startNextRead(BidirectionalStream stream) {
368         startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE));
369     }
370 
startNextRead(BidirectionalStream stream, ByteBuffer buffer)371     public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) {
372         mBufferPositionBeforeRead = buffer.position();
373         stream.read(buffer);
374     }
375 
startNextWrite(BidirectionalStream stream)376     public void startNextWrite(BidirectionalStream stream) {
377         if (!mWriteBuffers.isEmpty()) {
378             Iterator<WriteBuffer> iterator = mWriteBuffers.iterator();
379             while (iterator.hasNext()) {
380                 WriteBuffer b = iterator.next();
381                 stream.write(b.mBuffer, !iterator.hasNext());
382                 iterator.remove();
383                 if (b.mFlush) {
384                     stream.flush();
385                     break;
386                 }
387             }
388         }
389     }
390 
isDone()391     public boolean isDone() {
392         // It's not mentioned by the Android docs, but block(0) seems to block
393         // indefinitely, so have to block for one millisecond to get state
394         // without blocking.
395         return mDone.block(1);
396     }
397 
398     /** Returns the number of pending Writes. */
numPendingWrites()399     public int numPendingWrites() {
400         return mWriteBuffers.size();
401     }
402 
403     /**
404      * Asserts that there is no callback error before trying to access responseInfo. Only use this
405      * when you expect {@code mError} to be null.
406      * @return {@link UrlResponseInfo}
407      */
getResponseInfoWithChecks()408     public UrlResponseInfo getResponseInfoWithChecks() {
409         assertThat(mError).isNull();
410         assertThat(mOnErrorCalled).isFalse();
411         assertThat(mResponseInfo).isNotNull();
412         return mResponseInfo;
413     }
414 
415     /**
416      * Simply returns {@code mResponseInfo} with no nullability or error checks.
417      * @return {@link UrlResponseInfo}
418      */
getResponseInfo()419     public UrlResponseInfo getResponseInfo() {
420         return mResponseInfo;
421     }
422 
openDone()423     protected void openDone() {
424         mDone.open();
425     }
426 
427     /**
428      * Returns {@code false} if the callback should continue to advance the
429      * stream.
430      */
maybeThrowCancelOrPause( final BidirectionalStream stream, ConditionVariable stepBlock)431     private boolean maybeThrowCancelOrPause(
432             final BidirectionalStream stream, ConditionVariable stepBlock) {
433         if (mResponseStep != mFailureStep || mFailureType == FailureType.NONE) {
434             if (!mAutoAdvance) {
435                 stepBlock.open();
436                 return true;
437             }
438             return false;
439         }
440 
441         if (mFailureType == FailureType.THROW_SYNC) {
442             throw new IllegalStateException("Callback Exception.");
443         }
444         Runnable task =
445                 new Runnable() {
446                     @Override
447                     public void run() {
448                         stream.cancel();
449                     }
450                 };
451         if (mFailureType == FailureType.CANCEL_ASYNC
452                 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) {
453             getExecutor().execute(task);
454         } else {
455             task.run();
456         }
457         return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE;
458     }
459 
460     /** Checks whether callback methods are invoked on the correct thread. */
checkOnValidThread()461     private void checkOnValidThread() {
462         if (!mUseDirectExecutor) {
463             assertThat(Thread.currentThread()).isEqualTo(mExecutorThread);
464         }
465     }
466 
467     /**
468      * A simple callback for a succeeding stream. Fails when other callbacks that should not be
469      * executed are called.
470      */
471     public static class SimpleSucceedingCallback extends BidirectionalStream.Callback {
472         public final ConditionVariable done = new ConditionVariable();
473         private final ExecutorService mExecutor;
474 
SimpleSucceedingCallback()475         public SimpleSucceedingCallback() {
476             mExecutor = Executors.newSingleThreadExecutor();
477         }
478 
479         @Override
onStreamReady(BidirectionalStream stream)480         public void onStreamReady(BidirectionalStream stream) {}
481 
482         @Override
onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)483         public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) {
484             stream.read(ByteBuffer.allocateDirect(32 * 1024));
485         }
486 
487         @Override
onReadCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer byteBuffer, boolean endOfStream)488         public void onReadCompleted(
489                 BidirectionalStream stream,
490                 UrlResponseInfo info,
491                 ByteBuffer byteBuffer,
492                 boolean endOfStream) {
493             byteBuffer.clear(); // we don't care about the data
494             if (!endOfStream) {
495                 stream.read(byteBuffer);
496             }
497         }
498 
499         @Override
onWriteCompleted( BidirectionalStream bidirectionalStream, UrlResponseInfo urlResponseInfo, ByteBuffer byteBuffer, boolean endOfStream)500         public void onWriteCompleted(
501                 BidirectionalStream bidirectionalStream,
502                 UrlResponseInfo urlResponseInfo,
503                 ByteBuffer byteBuffer,
504                 boolean endOfStream) {
505             fail();
506         }
507 
508         @Override
onResponseTrailersReceived( BidirectionalStream bidirectionalStream, UrlResponseInfo urlResponseInfo, UrlResponseInfo.HeaderBlock headerBlock)509         public void onResponseTrailersReceived(
510                 BidirectionalStream bidirectionalStream,
511                 UrlResponseInfo urlResponseInfo,
512                 UrlResponseInfo.HeaderBlock headerBlock) {
513             fail();
514         }
515 
516         @Override
onSucceeded(BidirectionalStream stream, UrlResponseInfo info)517         public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
518             done.open();
519         }
520 
521         @Override
onFailed( BidirectionalStream bidirectionalStream, UrlResponseInfo urlResponseInfo, CronetException e)522         public void onFailed(
523                 BidirectionalStream bidirectionalStream,
524                 UrlResponseInfo urlResponseInfo,
525                 CronetException e) {
526             fail(e.getMessage());
527         }
528 
getExecutor()529         public ExecutorService getExecutor() {
530             return mExecutor;
531         }
532     }
533 }
534