1 // Copyright 2015 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.net; 6 7 import static com.google.common.truth.Truth.assertThat; 8 9 import static org.junit.Assert.fail; 10 11 import android.os.ConditionVariable; 12 13 import java.nio.ByteBuffer; 14 import java.util.ArrayList; 15 import java.util.Iterator; 16 import java.util.concurrent.Executor; 17 import java.util.concurrent.ExecutorService; 18 import java.util.concurrent.Executors; 19 import java.util.concurrent.ThreadFactory; 20 21 /** 22 * Callback that tracks information from different callbacks and and has a 23 * method to block thread until the stream completes on another thread. 24 * Allows to cancel, block stream or throw an exception from an arbitrary step. 25 */ 26 public class TestBidirectionalStreamCallback extends BidirectionalStream.Callback { 27 private UrlResponseInfo mResponseInfo; 28 public CronetException mError; 29 30 public ResponseStep mResponseStep = ResponseStep.NOTHING; 31 32 public boolean mOnErrorCalled; 33 public boolean mOnCanceledCalled; 34 35 public int mHttpResponseDataLength; 36 public String mResponseAsString = ""; 37 38 public UrlResponseInfo.HeaderBlock mTrailers; 39 40 private static final int READ_BUFFER_SIZE = 32 * 1024; 41 42 // When false, the consumer is responsible for all calls into the stream 43 // that advance it. 44 private boolean mAutoAdvance = true; 45 46 // The executor thread will block on this after reaching a terminal method. 47 // Terminal methods are (onSucceeded, onFailed or onCancelled) 48 private ConditionVariable mBlockOnTerminalState = new ConditionVariable(true); 49 50 // Conditionally fail on certain steps. 51 private FailureType mFailureType = FailureType.NONE; 52 private ResponseStep mFailureStep = ResponseStep.NOTHING; 53 54 // Signals when the stream is done either successfully or not. 55 private final ConditionVariable mDone = new ConditionVariable(); 56 57 // Signaled on each step when mAutoAdvance is false. 58 private final ConditionVariable mReadStepBlock = new ConditionVariable(); 59 private final ConditionVariable mWriteStepBlock = new ConditionVariable(); 60 61 // Executor Service for Cronet callbacks. 62 private final ExecutorService mExecutorService = 63 Executors.newSingleThreadExecutor(new ExecutorThreadFactory()); 64 private Thread mExecutorThread; 65 66 // position() of ByteBuffer prior to read() call. 67 private int mBufferPositionBeforeRead; 68 69 // Data to write. 70 private final ArrayList<WriteBuffer> mWriteBuffers = new ArrayList<WriteBuffer>(); 71 72 // Buffers that we yet to receive the corresponding onWriteCompleted callback. 73 private final ArrayList<WriteBuffer> mWriteBuffersToBeAcked = new ArrayList<WriteBuffer>(); 74 75 // Whether to use a direct executor. 76 private final boolean mUseDirectExecutor; 77 private final DirectExecutor mDirectExecutor; 78 79 private class ExecutorThreadFactory implements ThreadFactory { 80 @Override newThread(Runnable r)81 public Thread newThread(Runnable r) { 82 mExecutorThread = new Thread(r); 83 return mExecutorThread; 84 } 85 } 86 87 private static class WriteBuffer { 88 final ByteBuffer mBuffer; 89 final boolean mFlush; 90 WriteBuffer(ByteBuffer buffer, boolean flush)91 public WriteBuffer(ByteBuffer buffer, boolean flush) { 92 mBuffer = buffer; 93 mFlush = flush; 94 } 95 } 96 97 private static class DirectExecutor implements Executor { 98 @Override execute(Runnable task)99 public void execute(Runnable task) { 100 task.run(); 101 } 102 } 103 104 public enum ResponseStep { 105 NOTHING, 106 ON_STREAM_READY, 107 ON_RESPONSE_STARTED, 108 ON_READ_COMPLETED, 109 ON_WRITE_COMPLETED, 110 ON_TRAILERS, 111 ON_CANCELED, 112 ON_FAILED, 113 ON_SUCCEEDED, 114 } 115 116 public enum FailureType { 117 NONE, 118 CANCEL_SYNC, 119 CANCEL_ASYNC, 120 // Same as above, but continues to advance the stream after posting 121 // the cancellation task. 122 CANCEL_ASYNC_WITHOUT_PAUSE, 123 THROW_SYNC 124 } 125 TestBidirectionalStreamCallback()126 public TestBidirectionalStreamCallback() { 127 mUseDirectExecutor = false; 128 mDirectExecutor = null; 129 } 130 TestBidirectionalStreamCallback(boolean useDirectExecutor)131 public TestBidirectionalStreamCallback(boolean useDirectExecutor) { 132 mUseDirectExecutor = useDirectExecutor; 133 mDirectExecutor = new DirectExecutor(); 134 } 135 136 /** 137 * This blocks the callback executor thread once it has reached a final state callback. 138 * In order to continue execution, this method must be called again and providing {@code false} 139 * to continue execution. 140 * @param blockOnTerminalState the state to set for the executor thread 141 */ setBlockOnTerminalState(boolean blockOnTerminalState)142 public void setBlockOnTerminalState(boolean blockOnTerminalState) { 143 if (blockOnTerminalState) { 144 mBlockOnTerminalState.close(); 145 } else { 146 mBlockOnTerminalState.open(); 147 } 148 } 149 setAutoAdvance(boolean autoAdvance)150 public void setAutoAdvance(boolean autoAdvance) { 151 mAutoAdvance = autoAdvance; 152 } 153 setFailure(FailureType failureType, ResponseStep failureStep)154 public void setFailure(FailureType failureType, ResponseStep failureStep) { 155 mFailureStep = failureStep; 156 mFailureType = failureType; 157 } 158 blockForDone()159 public void blockForDone() { 160 mDone.block(); 161 } 162 waitForNextReadStep()163 public void waitForNextReadStep() { 164 mReadStepBlock.block(); 165 mReadStepBlock.close(); 166 } 167 waitForNextWriteStep()168 public void waitForNextWriteStep() { 169 mWriteStepBlock.block(); 170 mWriteStepBlock.close(); 171 } 172 getExecutor()173 public Executor getExecutor() { 174 if (mUseDirectExecutor) { 175 return mDirectExecutor; 176 } 177 return mExecutorService; 178 } 179 shutdownExecutor()180 public void shutdownExecutor() { 181 if (mUseDirectExecutor) { 182 throw new UnsupportedOperationException("DirectExecutor doesn't support shutdown"); 183 } 184 mExecutorService.shutdown(); 185 } 186 addWriteData(byte[] data)187 public void addWriteData(byte[] data) { 188 addWriteData(data, true); 189 } 190 addWriteData(byte[] data, boolean flush)191 public void addWriteData(byte[] data, boolean flush) { 192 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length); 193 writeBuffer.put(data); 194 writeBuffer.flip(); 195 mWriteBuffers.add(new WriteBuffer(writeBuffer, flush)); 196 mWriteBuffersToBeAcked.add(new WriteBuffer(writeBuffer, flush)); 197 } 198 199 @Override onStreamReady(BidirectionalStream stream)200 public void onStreamReady(BidirectionalStream stream) { 201 checkOnValidThread(); 202 assertThat(stream.isDone()).isFalse(); 203 assertThat(mResponseStep).isEqualTo(ResponseStep.NOTHING); 204 assertThat(mError).isNull(); 205 mResponseStep = ResponseStep.ON_STREAM_READY; 206 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 207 return; 208 } 209 startNextWrite(stream); 210 } 211 212 @Override onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)213 public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) { 214 checkOnValidThread(); 215 assertThat(stream.isDone()).isFalse(); 216 assertThat(mResponseStep) 217 .isAnyOf( 218 ResponseStep.NOTHING, 219 ResponseStep.ON_STREAM_READY, 220 ResponseStep.ON_WRITE_COMPLETED); 221 assertThat(mError).isNull(); 222 223 mResponseStep = ResponseStep.ON_RESPONSE_STARTED; 224 mResponseInfo = info; 225 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 226 return; 227 } 228 startNextRead(stream); 229 } 230 231 @Override onReadCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer byteBuffer, boolean endOfStream)232 public void onReadCompleted( 233 BidirectionalStream stream, 234 UrlResponseInfo info, 235 ByteBuffer byteBuffer, 236 boolean endOfStream) { 237 checkOnValidThread(); 238 assertThat(stream.isDone()).isFalse(); 239 assertThat(mResponseStep) 240 .isAnyOf( 241 ResponseStep.ON_RESPONSE_STARTED, 242 ResponseStep.ON_READ_COMPLETED, 243 ResponseStep.ON_WRITE_COMPLETED, 244 ResponseStep.ON_TRAILERS); 245 assertThat(mError).isNull(); 246 247 mResponseStep = ResponseStep.ON_READ_COMPLETED; 248 mResponseInfo = info; 249 250 final int bytesRead = byteBuffer.position() - mBufferPositionBeforeRead; 251 mHttpResponseDataLength += bytesRead; 252 final byte[] lastDataReceivedAsBytes = new byte[bytesRead]; 253 // Rewind byteBuffer.position() to pre-read() position. 254 byteBuffer.position(mBufferPositionBeforeRead); 255 // This restores byteBuffer.position() to its value on entrance to 256 // this function. 257 byteBuffer.get(lastDataReceivedAsBytes); 258 259 mResponseAsString += new String(lastDataReceivedAsBytes); 260 261 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 262 return; 263 } 264 // Do not read if EOF has been reached. 265 if (!endOfStream) { 266 startNextRead(stream); 267 } 268 } 269 270 @Override onWriteCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)271 public void onWriteCompleted( 272 BidirectionalStream stream, 273 UrlResponseInfo info, 274 ByteBuffer buffer, 275 boolean endOfStream) { 276 checkOnValidThread(); 277 assertThat(stream.isDone()).isFalse(); 278 assertThat(mError).isNull(); 279 mResponseStep = ResponseStep.ON_WRITE_COMPLETED; 280 mResponseInfo = info; 281 if (!mWriteBuffersToBeAcked.isEmpty()) { 282 assertThat(mWriteBuffersToBeAcked.get(0).mBuffer).isEqualTo(buffer); 283 mWriteBuffersToBeAcked.remove(0); 284 } 285 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 286 return; 287 } 288 startNextWrite(stream); 289 } 290 291 @Override onResponseTrailersReceived( BidirectionalStream stream, UrlResponseInfo info, UrlResponseInfo.HeaderBlock trailers)292 public void onResponseTrailersReceived( 293 BidirectionalStream stream, 294 UrlResponseInfo info, 295 UrlResponseInfo.HeaderBlock trailers) { 296 checkOnValidThread(); 297 assertThat(stream.isDone()).isFalse(); 298 assertThat(mError).isNull(); 299 mResponseStep = ResponseStep.ON_TRAILERS; 300 mResponseInfo = info; 301 mTrailers = trailers; 302 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 303 return; 304 } 305 } 306 307 @Override onSucceeded(BidirectionalStream stream, UrlResponseInfo info)308 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { 309 checkOnValidThread(); 310 assertThat(stream.isDone()).isTrue(); 311 assertThat(mResponseStep) 312 .isAnyOf( 313 ResponseStep.ON_RESPONSE_STARTED, 314 ResponseStep.ON_READ_COMPLETED, 315 ResponseStep.ON_WRITE_COMPLETED, 316 ResponseStep.ON_TRAILERS); 317 assertThat(mOnErrorCalled).isFalse(); 318 assertThat(mOnCanceledCalled).isFalse(); 319 assertThat(mError).isNull(); 320 assertThat(mWriteBuffers).isEmpty(); 321 assertThat(mWriteBuffersToBeAcked).isEmpty(); 322 323 mResponseStep = ResponseStep.ON_SUCCEEDED; 324 mResponseInfo = info; 325 openDone(); 326 mBlockOnTerminalState.block(); 327 maybeThrowCancelOrPause(stream, mReadStepBlock); 328 } 329 330 @Override onFailed(BidirectionalStream stream, UrlResponseInfo info, CronetException error)331 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, CronetException error) { 332 checkOnValidThread(); 333 assertThat(stream.isDone()).isTrue(); 334 // Shouldn't happen after success. 335 assertThat(mResponseStep).isNotEqualTo(ResponseStep.ON_SUCCEEDED); 336 // Should happen at most once for a single stream. 337 assertThat(mOnErrorCalled).isFalse(); 338 assertThat(mOnCanceledCalled).isFalse(); 339 assertThat(mError).isNull(); 340 mResponseStep = ResponseStep.ON_FAILED; 341 mResponseInfo = info; 342 343 mOnErrorCalled = true; 344 mError = error; 345 openDone(); 346 mBlockOnTerminalState.block(); 347 maybeThrowCancelOrPause(stream, mReadStepBlock); 348 } 349 350 @Override onCanceled(BidirectionalStream stream, UrlResponseInfo info)351 public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) { 352 checkOnValidThread(); 353 assertThat(stream.isDone()).isTrue(); 354 // Should happen at most once for a single stream. 355 assertThat(mOnCanceledCalled).isFalse(); 356 assertThat(mOnErrorCalled).isFalse(); 357 assertThat(mError).isNull(); 358 mResponseStep = ResponseStep.ON_CANCELED; 359 mResponseInfo = info; 360 361 mOnCanceledCalled = true; 362 openDone(); 363 mBlockOnTerminalState.block(); 364 maybeThrowCancelOrPause(stream, mReadStepBlock); 365 } 366 startNextRead(BidirectionalStream stream)367 public void startNextRead(BidirectionalStream stream) { 368 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE)); 369 } 370 startNextRead(BidirectionalStream stream, ByteBuffer buffer)371 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) { 372 mBufferPositionBeforeRead = buffer.position(); 373 stream.read(buffer); 374 } 375 startNextWrite(BidirectionalStream stream)376 public void startNextWrite(BidirectionalStream stream) { 377 if (!mWriteBuffers.isEmpty()) { 378 Iterator<WriteBuffer> iterator = mWriteBuffers.iterator(); 379 while (iterator.hasNext()) { 380 WriteBuffer b = iterator.next(); 381 stream.write(b.mBuffer, !iterator.hasNext()); 382 iterator.remove(); 383 if (b.mFlush) { 384 stream.flush(); 385 break; 386 } 387 } 388 } 389 } 390 isDone()391 public boolean isDone() { 392 // It's not mentioned by the Android docs, but block(0) seems to block 393 // indefinitely, so have to block for one millisecond to get state 394 // without blocking. 395 return mDone.block(1); 396 } 397 398 /** Returns the number of pending Writes. */ numPendingWrites()399 public int numPendingWrites() { 400 return mWriteBuffers.size(); 401 } 402 403 /** 404 * Asserts that there is no callback error before trying to access responseInfo. Only use this 405 * when you expect {@code mError} to be null. 406 * @return {@link UrlResponseInfo} 407 */ getResponseInfoWithChecks()408 public UrlResponseInfo getResponseInfoWithChecks() { 409 assertThat(mError).isNull(); 410 assertThat(mOnErrorCalled).isFalse(); 411 assertThat(mResponseInfo).isNotNull(); 412 return mResponseInfo; 413 } 414 415 /** 416 * Simply returns {@code mResponseInfo} with no nullability or error checks. 417 * @return {@link UrlResponseInfo} 418 */ getResponseInfo()419 public UrlResponseInfo getResponseInfo() { 420 return mResponseInfo; 421 } 422 openDone()423 protected void openDone() { 424 mDone.open(); 425 } 426 427 /** 428 * Returns {@code false} if the callback should continue to advance the 429 * stream. 430 */ maybeThrowCancelOrPause( final BidirectionalStream stream, ConditionVariable stepBlock)431 private boolean maybeThrowCancelOrPause( 432 final BidirectionalStream stream, ConditionVariable stepBlock) { 433 if (mResponseStep != mFailureStep || mFailureType == FailureType.NONE) { 434 if (!mAutoAdvance) { 435 stepBlock.open(); 436 return true; 437 } 438 return false; 439 } 440 441 if (mFailureType == FailureType.THROW_SYNC) { 442 throw new IllegalStateException("Callback Exception."); 443 } 444 Runnable task = 445 new Runnable() { 446 @Override 447 public void run() { 448 stream.cancel(); 449 } 450 }; 451 if (mFailureType == FailureType.CANCEL_ASYNC 452 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) { 453 getExecutor().execute(task); 454 } else { 455 task.run(); 456 } 457 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE; 458 } 459 460 /** Checks whether callback methods are invoked on the correct thread. */ checkOnValidThread()461 private void checkOnValidThread() { 462 if (!mUseDirectExecutor) { 463 assertThat(Thread.currentThread()).isEqualTo(mExecutorThread); 464 } 465 } 466 467 /** 468 * A simple callback for a succeeding stream. Fails when other callbacks that should not be 469 * executed are called. 470 */ 471 public static class SimpleSucceedingCallback extends BidirectionalStream.Callback { 472 public final ConditionVariable done = new ConditionVariable(); 473 private final ExecutorService mExecutor; 474 SimpleSucceedingCallback()475 public SimpleSucceedingCallback() { 476 mExecutor = Executors.newSingleThreadExecutor(); 477 } 478 479 @Override onStreamReady(BidirectionalStream stream)480 public void onStreamReady(BidirectionalStream stream) {} 481 482 @Override onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)483 public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) { 484 stream.read(ByteBuffer.allocateDirect(32 * 1024)); 485 } 486 487 @Override onReadCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer byteBuffer, boolean endOfStream)488 public void onReadCompleted( 489 BidirectionalStream stream, 490 UrlResponseInfo info, 491 ByteBuffer byteBuffer, 492 boolean endOfStream) { 493 byteBuffer.clear(); // we don't care about the data 494 if (!endOfStream) { 495 stream.read(byteBuffer); 496 } 497 } 498 499 @Override onWriteCompleted( BidirectionalStream bidirectionalStream, UrlResponseInfo urlResponseInfo, ByteBuffer byteBuffer, boolean endOfStream)500 public void onWriteCompleted( 501 BidirectionalStream bidirectionalStream, 502 UrlResponseInfo urlResponseInfo, 503 ByteBuffer byteBuffer, 504 boolean endOfStream) { 505 fail(); 506 } 507 508 @Override onResponseTrailersReceived( BidirectionalStream bidirectionalStream, UrlResponseInfo urlResponseInfo, UrlResponseInfo.HeaderBlock headerBlock)509 public void onResponseTrailersReceived( 510 BidirectionalStream bidirectionalStream, 511 UrlResponseInfo urlResponseInfo, 512 UrlResponseInfo.HeaderBlock headerBlock) { 513 fail(); 514 } 515 516 @Override onSucceeded(BidirectionalStream stream, UrlResponseInfo info)517 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { 518 done.open(); 519 } 520 521 @Override onFailed( BidirectionalStream bidirectionalStream, UrlResponseInfo urlResponseInfo, CronetException e)522 public void onFailed( 523 BidirectionalStream bidirectionalStream, 524 UrlResponseInfo urlResponseInfo, 525 CronetException e) { 526 fail(e.getMessage()); 527 } 528 getExecutor()529 public ExecutorService getExecutor() { 530 return mExecutor; 531 } 532 } 533 } 534