1 /* 2 * Copyright 2021 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package io.grpc.binder.internal; 17 18 import javax.annotation.concurrent.GuardedBy; 19 20 /** Keeps track of the number of bytes on the wire in a single direction. */ 21 final class FlowController { 22 private final int maxUnackedBytes; 23 24 @GuardedBy("this") 25 private long totalBytesSent; 26 27 @GuardedBy("this") 28 private long totalBytesAckedByPeer; 29 30 // @GuardedBy("this") for writes but not reads. 31 private volatile boolean transmitWindowFull; 32 33 /** 34 * Creates a new instance of {@link FlowController}. 35 * 36 * @param maxUnackedBytes a weak limit on the number of bytes sent but not yet acknowledged 37 */ FlowController(int maxUnackedBytes)38 public FlowController(int maxUnackedBytes) { 39 this.maxUnackedBytes = maxUnackedBytes; 40 } 41 42 /** 43 * Returns true iff the number of reported bytes sent but not yet acknowledged is greater than or 44 * equal to {@code maxUnackedBytes}. 45 */ isTransmitWindowFull()46 public boolean isTransmitWindowFull() { 47 return transmitWindowFull; 48 } 49 50 /** 51 * Informs this flow controller that more data has been sent. 52 * 53 * @param numBytesSent a non-negative number of additional bytes sent 54 * @return true iff this report caused {@link #isTransmitWindowFull()} to transition to true 55 */ notifyBytesSent(long numBytesSent)56 public synchronized boolean notifyBytesSent(long numBytesSent) { 57 totalBytesSent += numBytesSent; 58 if ((totalBytesSent - totalBytesAckedByPeer) >= maxUnackedBytes && !transmitWindowFull) { 59 transmitWindowFull = true; 60 return true; 61 } 62 return false; 63 } 64 65 /** 66 * Processes an acknowledgement from our peer. 67 * 68 * @param numBytesAcked the total number of bytes ever received over this connection modulo 2^64, 69 * with the most significant bit of this value encoded in this argument's sign bit 70 * @return true iff this report caused {@link #isTransmitWindowFull()} to transition to false 71 */ handleAcknowledgedBytes(long numBytesAcked)72 public synchronized boolean handleAcknowledgedBytes(long numBytesAcked) { 73 totalBytesAckedByPeer = wrapAwareMax(totalBytesAckedByPeer, numBytesAcked); 74 if ((totalBytesSent - totalBytesAckedByPeer) < maxUnackedBytes && transmitWindowFull) { 75 transmitWindowFull = false; 76 return true; 77 } 78 return false; 79 } 80 wrapAwareMax(long a, long b)81 private static final long wrapAwareMax(long a, long b) { 82 return a - b < 0 ? b : a; 83 } 84 } 85