xref: /aosp_15_r20/external/tensorflow/tensorflow/core/platform/cloud/gcs_file_system.h (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
18 
19 #include <string>
20 #include <unordered_set>
21 #include <utility>
22 #include <vector>
23 
24 #include "tensorflow/core/platform/cloud/auth_provider.h"
25 #include "tensorflow/core/platform/cloud/compute_engine_metadata_client.h"
26 #include "tensorflow/core/platform/cloud/compute_engine_zone_provider.h"
27 #include "tensorflow/core/platform/cloud/expiring_lru_cache.h"
28 #include "tensorflow/core/platform/cloud/file_block_cache.h"
29 #include "tensorflow/core/platform/cloud/gcs_dns_cache.h"
30 #include "tensorflow/core/platform/cloud/gcs_throttle.h"
31 #include "tensorflow/core/platform/cloud/http_request.h"
32 #include "tensorflow/core/platform/file_system.h"
33 #include "tensorflow/core/platform/retrying_file_system.h"
34 #include "tensorflow/core/platform/status.h"
35 
36 namespace tensorflow {
37 
38 class GcsFileSystem;
39 
40 // The environment variable that overrides the block size for aligned reads from
41 // GCS. Specified in MB (e.g. "16" = 16 x 1024 x 1024 = 16777216 bytes).
42 constexpr char kBlockSize[] = "GCS_READ_CACHE_BLOCK_SIZE_MB";
43 #if defined(LIBTPU_ON_GCE)
44 // Overwrite the default max block size for `libtpu` BUILDs which do not
45 // offer a mechanism to override the default through environment variable.
46 constexpr size_t kDefaultBlockSize = 512 * 1024 * 1024;
47 #else
48 constexpr size_t kDefaultBlockSize = 64 * 1024 * 1024;
49 #endif
50 // The environment variable that overrides the max size of the LRU cache of
51 // blocks read from GCS. Specified in MB.
52 constexpr char kMaxCacheSize[] = "GCS_READ_CACHE_MAX_SIZE_MB";
53 #if defined(LIBTPU_ON_GCE)
54 // Overwrite the default max cache size for `libtpu` BUILDs which do not
55 // offer a mechanism to override the default through environment variable.
56 constexpr size_t kDefaultMaxCacheSize = 163840LL * 1024LL * 1024LL;
57 #else
58 constexpr size_t kDefaultMaxCacheSize = 0;
59 #endif
60 // The environment variable that overrides the maximum staleness of cached file
61 // contents. Once any block of a file reaches this staleness, all cached blocks
62 // will be evicted on the next read.
63 constexpr char kMaxStaleness[] = "GCS_READ_CACHE_MAX_STALENESS";
64 constexpr uint64 kDefaultMaxStaleness = 0;
65 
66 // Helper function to extract an environment variable and convert it into a
67 // value of type T.
68 template <typename T>
GetEnvVar(const char * varname,bool (* convert)(StringPiece,T *),T * value)69 bool GetEnvVar(const char* varname, bool (*convert)(StringPiece, T*),
70                T* value) {
71   const char* env_value = std::getenv(varname);
72   if (env_value == nullptr) {
73     return false;
74   }
75   return convert(env_value, value);
76 }
77 
78 /// GcsStatsInterface allows for instrumentation of the GCS file system.
79 ///
80 /// GcsStatsInterface and its subclasses must be safe to use from multiple
81 /// threads concurrently.
82 ///
83 /// WARNING! This is an experimental interface that may change or go away at any
84 /// time.
85 class GcsStatsInterface {
86  public:
87   /// Configure is called by the GcsFileSystem to provide instrumentation hooks.
88   ///
89   /// Note: Configure can be called multiple times (e.g. if the block cache is
90   /// re-initialized).
91   virtual void Configure(GcsFileSystem* fs, GcsThrottle* throttle,
92                          const FileBlockCache* block_cache) = 0;
93 
94   /// RecordBlockLoadRequest is called to record a block load request is about
95   /// to be made.
96   virtual void RecordBlockLoadRequest(const string& file, size_t offset) = 0;
97 
98   /// RecordBlockRetrieved is called once a block within the file has been
99   /// retrieved.
100   virtual void RecordBlockRetrieved(const string& file, size_t offset,
101                                     size_t bytes_transferred) = 0;
102 
103   // RecordStatObjectRequest is called once a statting object request over GCS
104   // is about to be made.
105   virtual void RecordStatObjectRequest() = 0;
106 
107   /// HttpStats is called to optionally provide a RequestStats listener
108   /// to be annotated on every HTTP request made to the GCS API.
109   ///
110   /// HttpStats() may return nullptr.
111   virtual HttpRequest::RequestStats* HttpStats() = 0;
112 
113   virtual ~GcsStatsInterface() = default;
114 };
115 
116 struct UploadSessionHandle {
117   std::string session_uri;
118   bool resumable;
119 };
120 
121 /// Google Cloud Storage implementation of a file system.
122 ///
123 /// The clients should use RetryingGcsFileSystem defined below,
124 /// which adds retry logic to GCS operations.
125 class GcsFileSystem : public FileSystem {
126  public:
127   struct TimeoutConfig;
128 
129   // Main constructor used (via RetryingFileSystem) throughout Tensorflow
130   explicit GcsFileSystem(bool make_default_cache = true);
131   // Used mostly for unit testing or use cases which need to customize the
132   // filesystem from defaults
133   GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,
134                 std::unique_ptr<HttpRequest::Factory> http_request_factory,
135                 std::unique_ptr<ZoneProvider> zone_provider, size_t block_size,
136                 size_t max_bytes, uint64 max_staleness,
137                 uint64 stat_cache_max_age, size_t stat_cache_max_entries,
138                 uint64 matching_paths_cache_max_age,
139                 size_t matching_paths_cache_max_entries,
140                 RetryConfig retry_config, TimeoutConfig timeouts,
141                 const std::unordered_set<string>& allowed_locations,
142                 std::pair<const string, const string>* additional_header,
143                 bool compose_append);
144 
145   TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
146 
147   Status NewRandomAccessFile(
148       const string& fname, TransactionToken* token,
149       std::unique_ptr<RandomAccessFile>* result) override;
150 
151   Status NewWritableFile(const string& fname, TransactionToken* token,
152                          std::unique_ptr<WritableFile>* result) override;
153 
154   Status NewAppendableFile(const string& fname, TransactionToken* token,
155                            std::unique_ptr<WritableFile>* result) override;
156 
157   Status NewReadOnlyMemoryRegionFromFile(
158       const string& fname, TransactionToken* token,
159       std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
160 
161   Status FileExists(const string& fname, TransactionToken* token) override;
162 
163   Status Stat(const string& fname, TransactionToken* token,
164               FileStatistics* stat) override;
165 
166   Status GetChildren(const string& dir, TransactionToken* token,
167                      std::vector<string>* result) override;
168 
169   Status GetMatchingPaths(const string& pattern, TransactionToken* token,
170                           std::vector<string>* results) override;
171 
172   Status DeleteFile(const string& fname, TransactionToken* token) override;
173 
174   Status CreateDir(const string& dirname, TransactionToken* token) override;
175 
176   Status DeleteDir(const string& dirname, TransactionToken* token) override;
177 
178   Status GetFileSize(const string& fname, TransactionToken* token,
179                      uint64* file_size) override;
180 
181   Status RenameFile(const string& src, const string& target,
182                     TransactionToken* token) override;
183 
184   Status IsDirectory(const string& fname, TransactionToken* token) override;
185 
186   Status DeleteRecursively(const string& dirname, TransactionToken* token,
187                            int64_t* undeleted_files,
188                            int64_t* undeleted_dirs) override;
189 
190   void FlushCaches(TransactionToken* token) override;
191 
192   /// Set an object to collect runtime statistics from the GcsFilesystem.
193   void SetStats(GcsStatsInterface* stats);
194 
195   /// Set an object to collect file block cache stats.
196   void SetCacheStats(FileBlockCacheStatsInterface* cache_stats);
197 
198   /// These accessors are mainly for testing purposes, to verify that the
199   /// environment variables that control these parameters are handled correctly.
block_size()200   size_t block_size() {
201     tf_shared_lock l(block_cache_lock_);
202     return file_block_cache_->block_size();
203   }
max_bytes()204   size_t max_bytes() {
205     tf_shared_lock l(block_cache_lock_);
206     return file_block_cache_->max_bytes();
207   }
max_staleness()208   uint64 max_staleness() {
209     tf_shared_lock l(block_cache_lock_);
210     return file_block_cache_->max_staleness();
211   }
timeouts()212   TimeoutConfig timeouts() const { return timeouts_; }
allowed_locations()213   std::unordered_set<string> allowed_locations() const {
214     return allowed_locations_;
215   }
216 
compose_append()217   bool compose_append() const { return compose_append_; }
additional_header_name()218   string additional_header_name() const {
219     return additional_header_ ? additional_header_->first : "";
220   }
additional_header_value()221   string additional_header_value() const {
222     return additional_header_ ? additional_header_->second : "";
223   }
224 
stat_cache_max_age()225   uint64 stat_cache_max_age() const { return stat_cache_->max_age(); }
stat_cache_max_entries()226   size_t stat_cache_max_entries() const { return stat_cache_->max_entries(); }
227 
matching_paths_cache_max_age()228   uint64 matching_paths_cache_max_age() const {
229     return matching_paths_cache_->max_age();
230   }
matching_paths_cache_max_entries()231   size_t matching_paths_cache_max_entries() const {
232     return matching_paths_cache_->max_entries();
233   }
234 
235   /// Structure containing the information for timeouts related to accessing the
236   /// GCS APIs.
237   ///
238   /// All values are in seconds.
239   struct TimeoutConfig {
240     // The request connection timeout. If a connection cannot be established
241     // within `connect` seconds, abort the request.
242     uint32 connect = 120;  // 2 minutes
243 
244     // The request idle timeout. If a request has seen no activity in `idle`
245     // seconds, abort the request.
246     uint32 idle = 60;  // 1 minute
247 
248     // The maximum total time a metadata request can take. If a request has not
249     // completed within `metadata` seconds, the request is aborted.
250     uint32 metadata = 3600;  // 1 hour
251 
252     // The maximum total time a block read request can take. If a request has
253     // not completed within `read` seconds, the request is aborted.
254     uint32 read = 3600;  // 1 hour
255 
256     // The maximum total time an upload request can take. If a request has not
257     // completed within `write` seconds, the request is aborted.
258     uint32 write = 3600;  // 1 hour
259 
TimeoutConfigTimeoutConfig260     TimeoutConfig() {}
TimeoutConfigTimeoutConfig261     TimeoutConfig(uint32 connect, uint32 idle, uint32 metadata, uint32 read,
262                   uint32 write)
263         : connect(connect),
264           idle(idle),
265           metadata(metadata),
266           read(read),
267           write(write) {}
268   };
269 
270   Status CreateHttpRequest(std::unique_ptr<HttpRequest>* request);
271 
272   /// \brief Sets a new AuthProvider on the GCS FileSystem.
273   ///
274   /// The new auth provider will be used for all subsequent requests.
275   void SetAuthProvider(std::unique_ptr<AuthProvider> auth_provider);
276 
277   /// \brief Resets the block cache and re-instantiates it with the new values.
278   ///
279   /// This method can be used to clear the existing block cache and/or to
280   /// re-configure the block cache for different values.
281   ///
282   /// Note: the existing block cache is not cleaned up until all existing files
283   /// have been closed.
284   void ResetFileBlockCache(size_t block_size_bytes, size_t max_bytes,
285                            uint64 max_staleness_secs);
286 
287  protected:
288   virtual std::unique_ptr<FileBlockCache> MakeFileBlockCache(
289       size_t block_size, size_t max_bytes, uint64 max_staleness);
290 
291   /// Loads file contents from GCS for a given filename, offset, and length.
292   virtual Status LoadBufferFromGCS(const string& fname, size_t offset, size_t n,
293                                    char* buffer, size_t* bytes_transferred);
294 
295   // Creates an upload session for an upcoming GCS object upload.
296   virtual Status CreateNewUploadSession(uint64 start_offset,
297                                         const std::string& object_to_upload,
298                                         const std::string& bucket,
299                                         uint64 file_size,
300                                         const std::string& gcs_path,
301                                         UploadSessionHandle* session_handle);
302 
303   // Uploads object data to session.
304   virtual Status UploadToSession(const std::string& session_uri,
305                                  uint64 start_offset, uint64 already_uploaded,
306                                  const std::string& tmp_content_filename,
307                                  uint64 file_size,
308                                  const std::string& file_path);
309 
310   /// \brief Requests status of a previously initiated upload session.
311   ///
312   /// If the upload has already succeeded, sets 'completed' to true.
313   /// Otherwise sets 'completed' to false and 'uploaded' to the currently
314   /// uploaded size in bytes.
315   virtual Status RequestUploadSessionStatus(const string& session_uri,
316                                             uint64 file_size,
317                                             const std::string& gcs_path,
318                                             bool* completed, uint64* uploaded);
319 
320   Status ParseGcsPathForScheme(StringPiece fname, string scheme,
321                                bool empty_object_ok, string* bucket,
322                                string* object);
323 
324   /// \brief Splits a GCS path to a bucket and an object.
325   ///
326   /// For example, "gs://bucket-name/path/to/file.txt" gets split into
327   /// "bucket-name" and "path/to/file.txt".
328   /// If fname only contains the bucket and empty_object_ok = true, the returned
329   /// object is empty.
330   virtual Status ParseGcsPath(StringPiece fname, bool empty_object_ok,
331                               string* bucket, string* object);
332 
333   std::shared_ptr<ComputeEngineMetadataClient> compute_engine_metadata_client_;
334 
335   // Used by a subclass.
336   TimeoutConfig timeouts_;
337 
338   /// The retry configuration used for retrying failed calls.
339   RetryConfig retry_config_;
340 
341  private:
342   // GCS file statistics.
343   struct GcsFileStat {
344     FileStatistics base;
345     int64_t generation_number = 0;
346   };
347 
348   /// \brief Checks if the bucket exists. Returns OK if the check succeeded.
349   ///
350   /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
351   Status BucketExists(const string& bucket, bool* result);
352 
353   /// \brief Retrieves the GCS bucket location. Returns OK if the location was
354   /// retrieved.
355   ///
356   /// Given a string bucket the GCS bucket metadata API will be called and the
357   /// location string filled with the location of the bucket.
358   ///
359   /// This requires the bucket metadata permission.
360   /// Repeated calls for the same bucket are cached so this function can be
361   /// called frequently without causing an extra API call
362   Status GetBucketLocation(const string& bucket, string* location);
363 
364   /// \brief Check if the GCS buckets location is allowed with the current
365   /// constraint configuration
366   Status CheckBucketLocationConstraint(const string& bucket);
367 
368   /// \brief Given the input bucket `bucket`, fills `result_buffer` with the
369   /// results of the metadata. Returns OK if the API call succeeds without
370   /// error.
371   Status GetBucketMetadata(const string& bucket,
372                            std::vector<char>* result_buffer);
373 
374   /// \brief Checks if the object exists. Returns OK if the check succeeded.
375   ///
376   /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
377   Status ObjectExists(const string& fname, const string& bucket,
378                       const string& object, bool* result);
379 
380   /// \brief Checks if the folder exists. Returns OK if the check succeeded.
381   ///
382   /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
383   Status FolderExists(const string& dirname, bool* result);
384 
385   /// \brief Internal version of GetChildren with more knobs.
386   ///
387   /// If 'recursively' is true, returns all objects in all subfolders.
388   /// Otherwise only returns the immediate children in the directory.
389   ///
390   /// If 'include_self_directory_marker' is true and there is a GCS directory
391   /// marker at the path 'dir', GetChildrenBound will return an empty string
392   /// as one of the children that represents this marker.
393   Status GetChildrenBounded(const string& dir, uint64 max_results,
394                             std::vector<string>* result, bool recursively,
395                             bool include_self_directory_marker);
396 
397   /// Retrieves file statistics assuming fname points to a GCS object. The data
398   /// may be read from cache or from GCS directly.
399   Status StatForObject(const string& fname, const string& bucket,
400                        const string& object, GcsFileStat* stat);
401   /// Retrieves file statistics of file fname directly from GCS.
402   Status UncachedStatForObject(const string& fname, const string& bucket,
403                                const string& object, GcsFileStat* stat);
404 
405   Status RenameObject(const string& src, const string& target);
406 
407   // Clear all the caches related to the file with name `filename`.
408   void ClearFileCaches(const string& fname);
409 
410   mutex mu_;
411   std::unique_ptr<AuthProvider> auth_provider_ TF_GUARDED_BY(mu_);
412   std::shared_ptr<HttpRequest::Factory> http_request_factory_;
413   std::unique_ptr<ZoneProvider> zone_provider_;
414 
415   // Reads smaller than block_size_ will trigger a read of block_size_.
416   uint64 block_size_;
417 
418   // block_cache_lock_ protects the file_block_cache_ pointer (Note that
419   // FileBlockCache instances are themselves threadsafe).
420   mutex block_cache_lock_;
421   std::unique_ptr<FileBlockCache> file_block_cache_
422       TF_GUARDED_BY(block_cache_lock_);
423 
424   bool cache_enabled_;
425   std::unique_ptr<GcsDnsCache> dns_cache_;
426   GcsThrottle throttle_;
427 
428   using StatCache = ExpiringLRUCache<GcsFileStat>;
429   std::unique_ptr<StatCache> stat_cache_;
430 
431   using MatchingPathsCache = ExpiringLRUCache<std::vector<string>>;
432   std::unique_ptr<MatchingPathsCache> matching_paths_cache_;
433 
434   using BucketLocationCache = ExpiringLRUCache<string>;
435   std::unique_ptr<BucketLocationCache> bucket_location_cache_;
436   std::unordered_set<string> allowed_locations_;
437   bool compose_append_;
438 
439   GcsStatsInterface* stats_ = nullptr;  // Not owned.
440 
441   // Additional header material to be transmitted with all GCS requests
442   std::unique_ptr<std::pair<const string, const string>> additional_header_;
443 
444   TF_DISALLOW_COPY_AND_ASSIGN(GcsFileSystem);
445 };
446 
447 /// Google Cloud Storage implementation of a file system with retry on failures.
448 class RetryingGcsFileSystem : public RetryingFileSystem<GcsFileSystem> {
449  public:
RetryingGcsFileSystem()450   RetryingGcsFileSystem()
451       : RetryingFileSystem(std::unique_ptr<GcsFileSystem>(new GcsFileSystem),
452                            RetryConfig(100000 /* init_delay_time_us */)) {}
453 };
454 
455 }  // namespace tensorflow
456 
457 #endif  // TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
458