1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
18
19 #include <string>
20 #include <unordered_set>
21 #include <utility>
22 #include <vector>
23
24 #include "tensorflow/core/platform/cloud/auth_provider.h"
25 #include "tensorflow/core/platform/cloud/compute_engine_metadata_client.h"
26 #include "tensorflow/core/platform/cloud/compute_engine_zone_provider.h"
27 #include "tensorflow/core/platform/cloud/expiring_lru_cache.h"
28 #include "tensorflow/core/platform/cloud/file_block_cache.h"
29 #include "tensorflow/core/platform/cloud/gcs_dns_cache.h"
30 #include "tensorflow/core/platform/cloud/gcs_throttle.h"
31 #include "tensorflow/core/platform/cloud/http_request.h"
32 #include "tensorflow/core/platform/file_system.h"
33 #include "tensorflow/core/platform/retrying_file_system.h"
34 #include "tensorflow/core/platform/status.h"
35
36 namespace tensorflow {
37
38 class GcsFileSystem;
39
40 // The environment variable that overrides the block size for aligned reads from
41 // GCS. Specified in MB (e.g. "16" = 16 x 1024 x 1024 = 16777216 bytes).
42 constexpr char kBlockSize[] = "GCS_READ_CACHE_BLOCK_SIZE_MB";
43 #if defined(LIBTPU_ON_GCE)
44 // Overwrite the default max block size for `libtpu` BUILDs which do not
45 // offer a mechanism to override the default through environment variable.
46 constexpr size_t kDefaultBlockSize = 512 * 1024 * 1024;
47 #else
48 constexpr size_t kDefaultBlockSize = 64 * 1024 * 1024;
49 #endif
50 // The environment variable that overrides the max size of the LRU cache of
51 // blocks read from GCS. Specified in MB.
52 constexpr char kMaxCacheSize[] = "GCS_READ_CACHE_MAX_SIZE_MB";
53 #if defined(LIBTPU_ON_GCE)
54 // Overwrite the default max cache size for `libtpu` BUILDs which do not
55 // offer a mechanism to override the default through environment variable.
56 constexpr size_t kDefaultMaxCacheSize = 163840LL * 1024LL * 1024LL;
57 #else
58 constexpr size_t kDefaultMaxCacheSize = 0;
59 #endif
60 // The environment variable that overrides the maximum staleness of cached file
61 // contents. Once any block of a file reaches this staleness, all cached blocks
62 // will be evicted on the next read.
63 constexpr char kMaxStaleness[] = "GCS_READ_CACHE_MAX_STALENESS";
64 constexpr uint64 kDefaultMaxStaleness = 0;
65
66 // Helper function to extract an environment variable and convert it into a
67 // value of type T.
68 template <typename T>
GetEnvVar(const char * varname,bool (* convert)(StringPiece,T *),T * value)69 bool GetEnvVar(const char* varname, bool (*convert)(StringPiece, T*),
70 T* value) {
71 const char* env_value = std::getenv(varname);
72 if (env_value == nullptr) {
73 return false;
74 }
75 return convert(env_value, value);
76 }
77
78 /// GcsStatsInterface allows for instrumentation of the GCS file system.
79 ///
80 /// GcsStatsInterface and its subclasses must be safe to use from multiple
81 /// threads concurrently.
82 ///
83 /// WARNING! This is an experimental interface that may change or go away at any
84 /// time.
85 class GcsStatsInterface {
86 public:
87 /// Configure is called by the GcsFileSystem to provide instrumentation hooks.
88 ///
89 /// Note: Configure can be called multiple times (e.g. if the block cache is
90 /// re-initialized).
91 virtual void Configure(GcsFileSystem* fs, GcsThrottle* throttle,
92 const FileBlockCache* block_cache) = 0;
93
94 /// RecordBlockLoadRequest is called to record a block load request is about
95 /// to be made.
96 virtual void RecordBlockLoadRequest(const string& file, size_t offset) = 0;
97
98 /// RecordBlockRetrieved is called once a block within the file has been
99 /// retrieved.
100 virtual void RecordBlockRetrieved(const string& file, size_t offset,
101 size_t bytes_transferred) = 0;
102
103 // RecordStatObjectRequest is called once a statting object request over GCS
104 // is about to be made.
105 virtual void RecordStatObjectRequest() = 0;
106
107 /// HttpStats is called to optionally provide a RequestStats listener
108 /// to be annotated on every HTTP request made to the GCS API.
109 ///
110 /// HttpStats() may return nullptr.
111 virtual HttpRequest::RequestStats* HttpStats() = 0;
112
113 virtual ~GcsStatsInterface() = default;
114 };
115
116 struct UploadSessionHandle {
117 std::string session_uri;
118 bool resumable;
119 };
120
121 /// Google Cloud Storage implementation of a file system.
122 ///
123 /// The clients should use RetryingGcsFileSystem defined below,
124 /// which adds retry logic to GCS operations.
125 class GcsFileSystem : public FileSystem {
126 public:
127 struct TimeoutConfig;
128
129 // Main constructor used (via RetryingFileSystem) throughout Tensorflow
130 explicit GcsFileSystem(bool make_default_cache = true);
131 // Used mostly for unit testing or use cases which need to customize the
132 // filesystem from defaults
133 GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,
134 std::unique_ptr<HttpRequest::Factory> http_request_factory,
135 std::unique_ptr<ZoneProvider> zone_provider, size_t block_size,
136 size_t max_bytes, uint64 max_staleness,
137 uint64 stat_cache_max_age, size_t stat_cache_max_entries,
138 uint64 matching_paths_cache_max_age,
139 size_t matching_paths_cache_max_entries,
140 RetryConfig retry_config, TimeoutConfig timeouts,
141 const std::unordered_set<string>& allowed_locations,
142 std::pair<const string, const string>* additional_header,
143 bool compose_append);
144
145 TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
146
147 Status NewRandomAccessFile(
148 const string& fname, TransactionToken* token,
149 std::unique_ptr<RandomAccessFile>* result) override;
150
151 Status NewWritableFile(const string& fname, TransactionToken* token,
152 std::unique_ptr<WritableFile>* result) override;
153
154 Status NewAppendableFile(const string& fname, TransactionToken* token,
155 std::unique_ptr<WritableFile>* result) override;
156
157 Status NewReadOnlyMemoryRegionFromFile(
158 const string& fname, TransactionToken* token,
159 std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
160
161 Status FileExists(const string& fname, TransactionToken* token) override;
162
163 Status Stat(const string& fname, TransactionToken* token,
164 FileStatistics* stat) override;
165
166 Status GetChildren(const string& dir, TransactionToken* token,
167 std::vector<string>* result) override;
168
169 Status GetMatchingPaths(const string& pattern, TransactionToken* token,
170 std::vector<string>* results) override;
171
172 Status DeleteFile(const string& fname, TransactionToken* token) override;
173
174 Status CreateDir(const string& dirname, TransactionToken* token) override;
175
176 Status DeleteDir(const string& dirname, TransactionToken* token) override;
177
178 Status GetFileSize(const string& fname, TransactionToken* token,
179 uint64* file_size) override;
180
181 Status RenameFile(const string& src, const string& target,
182 TransactionToken* token) override;
183
184 Status IsDirectory(const string& fname, TransactionToken* token) override;
185
186 Status DeleteRecursively(const string& dirname, TransactionToken* token,
187 int64_t* undeleted_files,
188 int64_t* undeleted_dirs) override;
189
190 void FlushCaches(TransactionToken* token) override;
191
192 /// Set an object to collect runtime statistics from the GcsFilesystem.
193 void SetStats(GcsStatsInterface* stats);
194
195 /// Set an object to collect file block cache stats.
196 void SetCacheStats(FileBlockCacheStatsInterface* cache_stats);
197
198 /// These accessors are mainly for testing purposes, to verify that the
199 /// environment variables that control these parameters are handled correctly.
block_size()200 size_t block_size() {
201 tf_shared_lock l(block_cache_lock_);
202 return file_block_cache_->block_size();
203 }
max_bytes()204 size_t max_bytes() {
205 tf_shared_lock l(block_cache_lock_);
206 return file_block_cache_->max_bytes();
207 }
max_staleness()208 uint64 max_staleness() {
209 tf_shared_lock l(block_cache_lock_);
210 return file_block_cache_->max_staleness();
211 }
timeouts()212 TimeoutConfig timeouts() const { return timeouts_; }
allowed_locations()213 std::unordered_set<string> allowed_locations() const {
214 return allowed_locations_;
215 }
216
compose_append()217 bool compose_append() const { return compose_append_; }
additional_header_name()218 string additional_header_name() const {
219 return additional_header_ ? additional_header_->first : "";
220 }
additional_header_value()221 string additional_header_value() const {
222 return additional_header_ ? additional_header_->second : "";
223 }
224
stat_cache_max_age()225 uint64 stat_cache_max_age() const { return stat_cache_->max_age(); }
stat_cache_max_entries()226 size_t stat_cache_max_entries() const { return stat_cache_->max_entries(); }
227
matching_paths_cache_max_age()228 uint64 matching_paths_cache_max_age() const {
229 return matching_paths_cache_->max_age();
230 }
matching_paths_cache_max_entries()231 size_t matching_paths_cache_max_entries() const {
232 return matching_paths_cache_->max_entries();
233 }
234
235 /// Structure containing the information for timeouts related to accessing the
236 /// GCS APIs.
237 ///
238 /// All values are in seconds.
239 struct TimeoutConfig {
240 // The request connection timeout. If a connection cannot be established
241 // within `connect` seconds, abort the request.
242 uint32 connect = 120; // 2 minutes
243
244 // The request idle timeout. If a request has seen no activity in `idle`
245 // seconds, abort the request.
246 uint32 idle = 60; // 1 minute
247
248 // The maximum total time a metadata request can take. If a request has not
249 // completed within `metadata` seconds, the request is aborted.
250 uint32 metadata = 3600; // 1 hour
251
252 // The maximum total time a block read request can take. If a request has
253 // not completed within `read` seconds, the request is aborted.
254 uint32 read = 3600; // 1 hour
255
256 // The maximum total time an upload request can take. If a request has not
257 // completed within `write` seconds, the request is aborted.
258 uint32 write = 3600; // 1 hour
259
TimeoutConfigTimeoutConfig260 TimeoutConfig() {}
TimeoutConfigTimeoutConfig261 TimeoutConfig(uint32 connect, uint32 idle, uint32 metadata, uint32 read,
262 uint32 write)
263 : connect(connect),
264 idle(idle),
265 metadata(metadata),
266 read(read),
267 write(write) {}
268 };
269
270 Status CreateHttpRequest(std::unique_ptr<HttpRequest>* request);
271
272 /// \brief Sets a new AuthProvider on the GCS FileSystem.
273 ///
274 /// The new auth provider will be used for all subsequent requests.
275 void SetAuthProvider(std::unique_ptr<AuthProvider> auth_provider);
276
277 /// \brief Resets the block cache and re-instantiates it with the new values.
278 ///
279 /// This method can be used to clear the existing block cache and/or to
280 /// re-configure the block cache for different values.
281 ///
282 /// Note: the existing block cache is not cleaned up until all existing files
283 /// have been closed.
284 void ResetFileBlockCache(size_t block_size_bytes, size_t max_bytes,
285 uint64 max_staleness_secs);
286
287 protected:
288 virtual std::unique_ptr<FileBlockCache> MakeFileBlockCache(
289 size_t block_size, size_t max_bytes, uint64 max_staleness);
290
291 /// Loads file contents from GCS for a given filename, offset, and length.
292 virtual Status LoadBufferFromGCS(const string& fname, size_t offset, size_t n,
293 char* buffer, size_t* bytes_transferred);
294
295 // Creates an upload session for an upcoming GCS object upload.
296 virtual Status CreateNewUploadSession(uint64 start_offset,
297 const std::string& object_to_upload,
298 const std::string& bucket,
299 uint64 file_size,
300 const std::string& gcs_path,
301 UploadSessionHandle* session_handle);
302
303 // Uploads object data to session.
304 virtual Status UploadToSession(const std::string& session_uri,
305 uint64 start_offset, uint64 already_uploaded,
306 const std::string& tmp_content_filename,
307 uint64 file_size,
308 const std::string& file_path);
309
310 /// \brief Requests status of a previously initiated upload session.
311 ///
312 /// If the upload has already succeeded, sets 'completed' to true.
313 /// Otherwise sets 'completed' to false and 'uploaded' to the currently
314 /// uploaded size in bytes.
315 virtual Status RequestUploadSessionStatus(const string& session_uri,
316 uint64 file_size,
317 const std::string& gcs_path,
318 bool* completed, uint64* uploaded);
319
320 Status ParseGcsPathForScheme(StringPiece fname, string scheme,
321 bool empty_object_ok, string* bucket,
322 string* object);
323
324 /// \brief Splits a GCS path to a bucket and an object.
325 ///
326 /// For example, "gs://bucket-name/path/to/file.txt" gets split into
327 /// "bucket-name" and "path/to/file.txt".
328 /// If fname only contains the bucket and empty_object_ok = true, the returned
329 /// object is empty.
330 virtual Status ParseGcsPath(StringPiece fname, bool empty_object_ok,
331 string* bucket, string* object);
332
333 std::shared_ptr<ComputeEngineMetadataClient> compute_engine_metadata_client_;
334
335 // Used by a subclass.
336 TimeoutConfig timeouts_;
337
338 /// The retry configuration used for retrying failed calls.
339 RetryConfig retry_config_;
340
341 private:
342 // GCS file statistics.
343 struct GcsFileStat {
344 FileStatistics base;
345 int64_t generation_number = 0;
346 };
347
348 /// \brief Checks if the bucket exists. Returns OK if the check succeeded.
349 ///
350 /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
351 Status BucketExists(const string& bucket, bool* result);
352
353 /// \brief Retrieves the GCS bucket location. Returns OK if the location was
354 /// retrieved.
355 ///
356 /// Given a string bucket the GCS bucket metadata API will be called and the
357 /// location string filled with the location of the bucket.
358 ///
359 /// This requires the bucket metadata permission.
360 /// Repeated calls for the same bucket are cached so this function can be
361 /// called frequently without causing an extra API call
362 Status GetBucketLocation(const string& bucket, string* location);
363
364 /// \brief Check if the GCS buckets location is allowed with the current
365 /// constraint configuration
366 Status CheckBucketLocationConstraint(const string& bucket);
367
368 /// \brief Given the input bucket `bucket`, fills `result_buffer` with the
369 /// results of the metadata. Returns OK if the API call succeeds without
370 /// error.
371 Status GetBucketMetadata(const string& bucket,
372 std::vector<char>* result_buffer);
373
374 /// \brief Checks if the object exists. Returns OK if the check succeeded.
375 ///
376 /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
377 Status ObjectExists(const string& fname, const string& bucket,
378 const string& object, bool* result);
379
380 /// \brief Checks if the folder exists. Returns OK if the check succeeded.
381 ///
382 /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
383 Status FolderExists(const string& dirname, bool* result);
384
385 /// \brief Internal version of GetChildren with more knobs.
386 ///
387 /// If 'recursively' is true, returns all objects in all subfolders.
388 /// Otherwise only returns the immediate children in the directory.
389 ///
390 /// If 'include_self_directory_marker' is true and there is a GCS directory
391 /// marker at the path 'dir', GetChildrenBound will return an empty string
392 /// as one of the children that represents this marker.
393 Status GetChildrenBounded(const string& dir, uint64 max_results,
394 std::vector<string>* result, bool recursively,
395 bool include_self_directory_marker);
396
397 /// Retrieves file statistics assuming fname points to a GCS object. The data
398 /// may be read from cache or from GCS directly.
399 Status StatForObject(const string& fname, const string& bucket,
400 const string& object, GcsFileStat* stat);
401 /// Retrieves file statistics of file fname directly from GCS.
402 Status UncachedStatForObject(const string& fname, const string& bucket,
403 const string& object, GcsFileStat* stat);
404
405 Status RenameObject(const string& src, const string& target);
406
407 // Clear all the caches related to the file with name `filename`.
408 void ClearFileCaches(const string& fname);
409
410 mutex mu_;
411 std::unique_ptr<AuthProvider> auth_provider_ TF_GUARDED_BY(mu_);
412 std::shared_ptr<HttpRequest::Factory> http_request_factory_;
413 std::unique_ptr<ZoneProvider> zone_provider_;
414
415 // Reads smaller than block_size_ will trigger a read of block_size_.
416 uint64 block_size_;
417
418 // block_cache_lock_ protects the file_block_cache_ pointer (Note that
419 // FileBlockCache instances are themselves threadsafe).
420 mutex block_cache_lock_;
421 std::unique_ptr<FileBlockCache> file_block_cache_
422 TF_GUARDED_BY(block_cache_lock_);
423
424 bool cache_enabled_;
425 std::unique_ptr<GcsDnsCache> dns_cache_;
426 GcsThrottle throttle_;
427
428 using StatCache = ExpiringLRUCache<GcsFileStat>;
429 std::unique_ptr<StatCache> stat_cache_;
430
431 using MatchingPathsCache = ExpiringLRUCache<std::vector<string>>;
432 std::unique_ptr<MatchingPathsCache> matching_paths_cache_;
433
434 using BucketLocationCache = ExpiringLRUCache<string>;
435 std::unique_ptr<BucketLocationCache> bucket_location_cache_;
436 std::unordered_set<string> allowed_locations_;
437 bool compose_append_;
438
439 GcsStatsInterface* stats_ = nullptr; // Not owned.
440
441 // Additional header material to be transmitted with all GCS requests
442 std::unique_ptr<std::pair<const string, const string>> additional_header_;
443
444 TF_DISALLOW_COPY_AND_ASSIGN(GcsFileSystem);
445 };
446
447 /// Google Cloud Storage implementation of a file system with retry on failures.
448 class RetryingGcsFileSystem : public RetryingFileSystem<GcsFileSystem> {
449 public:
RetryingGcsFileSystem()450 RetryingGcsFileSystem()
451 : RetryingFileSystem(std::unique_ptr<GcsFileSystem>(new GcsFileSystem),
452 RetryConfig(100000 /* init_delay_time_us */)) {}
453 };
454
455 } // namespace tensorflow
456
457 #endif // TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
458