xref: /aosp_15_r20/external/tensorflow/tensorflow/core/platform/cloud/gcs_throttle.h (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_THROTTLE_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_THROTTLE_H_
18 
19 #include "tensorflow/core/platform/env.h"
20 
21 namespace tensorflow {
22 
23 /**
24  * GcsThrottleConfig is used to configure the GcsThrottle.
25  */
26 struct GcsThrottleConfig {
27   /**
28    * enabled is true if GcsThrottle should throttle requests, false otherwise.
29    */
30   bool enabled = false;
31 
32   /**
33    * token_rate is the number of tokens accrued every second that can be used
34    * for making requests to the GCS service.
35    */
36   int64_t token_rate =
37       100000;  // Approximately 800 MBits/second bandwidth-only.
38 
39   /**
40    * bucket_size is the maximum number of available tokens the GcsThrottle can
41    * accrue.
42    */
43   int64_t bucket_size = 10000000;  // 10 million tokens total
44 
45   /**
46    * tokens_per_request determines the number of tokens consumed for every
47    * request.
48    *
49    * Note: tokens are also consumed in proportion to the response size.
50    */
51   int64_t tokens_per_request = 100;
52 
53   /**
54    * initial_tokens determines how many tokens should be available immediately
55    * after the GcsThrottle is constructed.
56    */
57   int64_t initial_tokens = 0;
58 };
59 
60 /**
61  * GcsThrottle is used to ensure fair use of the available GCS capacity.
62  *
63  * GcsThrottle operates around a concept of tokens. Tokens are consumed when
64  * making requests to the GCS service. Tokens are consumed both based on the
65  * number of requests made, as well as the bandwidth consumed (response sizes).
66  *
67  * GcsThrottle is thread safe and can be used from multiple threads.
68  */
69 class GcsThrottle {
70  public:
71   /**
72    * Constructs a GcsThrottle.
73    */
74   explicit GcsThrottle(EnvTime* env_time = nullptr);
75 
76   /**
77    * AdmitRequest updates the GcsThrottle to record a request will be made.
78    *
79    * AdmitRequest should be called before any request is made. AdmitRequest
80    * returns false if the request should be denied. If AdmitRequest
81    * returns false, no tokens are consumed. If true is returned, the configured
82    * number of tokens are consumed.
83    */
84   bool AdmitRequest();
85 
86   /**
87    * RecordResponse updates the GcsThrottle to record a request has been made.
88    *
89    * RecordResponse should be called after the response has been received.
90    * RecordResponse will update the internal state based on the number of bytes
91    * in the response.
92    *
93    * Note: we split up the request and the response in this fashion in order to
94    * avoid penalizing consumers who are using large readahead buffers at higher
95    * layers of the I/O stack.
96    */
97   void RecordResponse(size_t num_bytes);
98 
99   /**
100    * SetConfig sets the configuration for GcsThrottle and re-initializes state.
101    *
102    * After calling this, the token pool will be config.initial_tokens.
103    */
104   void SetConfig(GcsThrottleConfig config);
105 
106   /**
107    * available_tokens gives a snapshot of how many tokens are available.
108    *
109    * The returned value should not be used to make admission decisions. The
110    * purpose of this function is to make available to monitoring or other
111    * instrumentation the number of available tokens in the pool.
112    */
available_tokens()113   inline int64_t available_tokens() TF_LOCKS_EXCLUDED(mu_) {
114     mutex_lock l(mu_);
115     UpdateState();
116     return available_tokens_;
117   }
118 
119   /**
120    * is_enabled determines if the throttle is enabled.
121    *
122    * If !is_enabled(), AdmitRequest() will always return true. To enable the
123    * throttle, call SetConfig passing in a configuration that has enabled set to
124    * true.
125    */
is_enabled()126   bool is_enabled() TF_LOCKS_EXCLUDED(mu_) {
127     mutex_lock l(mu_);
128     return config_.enabled;
129   }
130 
131  private:
132   /**
133    * UpdateState updates the available_tokens_ and last_updated_secs_ variables.
134    *
135    * UpdateState should be called in order to mark the passage of time, and
136    * therefore add tokens to the available_tokens_ pool.
137    */
138   void UpdateState() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
139 
request_bytes_to_tokens(size_t num_bytes)140   inline uint64 request_bytes_to_tokens(size_t num_bytes) {
141     return num_bytes >> 10;
142   }
143 
144   mutex mu_;
145 
146   /**
147    * last_updated_secs_ records the number of seconds since the Unix epoch that
148    * the internal state of the GcsThrottle was updated. This is important when
149    * determining the number of tokens to add to the available_tokens_ pool.
150    */
151   uint64 last_updated_secs_ TF_GUARDED_BY(mu_) = 0;
152 
153   /**
154    * available_tokens_ records how many tokens are available to be consumed.
155    *
156    * Note: it is possible for available_tokens_ to become negative. If a
157    * response comes back that consumes more than the available tokens, the count
158    * will go negative, and block future requests until we have available tokens.
159    */
160   int64_t available_tokens_ TF_GUARDED_BY(mu_) = 0;
161 
162   EnvTime* const env_time_;
163   GcsThrottleConfig config_ TF_GUARDED_BY(mu_);
164 };
165 
166 }  // namespace tensorflow
167 
168 #endif  // TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_THROTTLE_H_
169