xref: /aosp_15_r20/external/tensorflow/tensorflow/core/platform/cloud/gcs_file_system.cc (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/platform/cloud/gcs_file_system.h"
17 
18 #include <stdio.h>
19 #include <unistd.h>
20 
21 #include <algorithm>
22 #include <cstdio>
23 #include <cstdlib>
24 #include <cstring>
25 #include <fstream>
26 #include <functional>
27 #include <string>
28 #include <utility>
29 #include <vector>
30 
31 #include "tensorflow/core/platform/file_statistics.h"
32 #include "tensorflow/core/platform/strcat.h"
33 #ifdef _WIN32
34 #include <io.h>  // for _mktemp
35 #endif
36 #include "absl/base/macros.h"
37 #include "json/json.h"
38 #include "tensorflow/core/lib/gtl/map_util.h"
39 #include "tensorflow/core/platform/cloud/curl_http_request.h"
40 #include "tensorflow/core/platform/cloud/file_block_cache.h"
41 #include "tensorflow/core/platform/cloud/google_auth_provider.h"
42 #include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
43 #include "tensorflow/core/platform/cloud/time_util.h"
44 #include "tensorflow/core/platform/env.h"
45 #include "tensorflow/core/platform/errors.h"
46 #include "tensorflow/core/platform/mutex.h"
47 #include "tensorflow/core/platform/numbers.h"
48 #include "tensorflow/core/platform/path.h"
49 #include "tensorflow/core/platform/protobuf.h"
50 #include "tensorflow/core/platform/retrying_utils.h"
51 #include "tensorflow/core/platform/str_util.h"
52 #include "tensorflow/core/platform/stringprintf.h"
53 #include "tensorflow/core/platform/thread_annotations.h"
54 #include "tensorflow/core/profiler/lib/traceme.h"
55 
56 #ifdef _WIN32
57 #ifdef DeleteFile
58 #undef DeleteFile
59 #endif
60 #endif
61 
62 namespace tensorflow {
63 namespace {
64 
65 constexpr char kGcsUriBase[] = "https://www.googleapis.com/storage/v1/";
66 constexpr char kGcsUploadUriBase[] =
67     "https://www.googleapis.com/upload/storage/v1/";
68 constexpr char kStorageHost[] = "storage.googleapis.com";
69 constexpr char kBucketMetadataLocationKey[] = "location";
70 constexpr size_t kReadAppendableFileBufferSize = 1024 * 1024;  // In bytes.
71 constexpr int kGetChildrenDefaultPageSize = 1000;
72 // The HTTP response code "308 Resume Incomplete".
73 constexpr uint64 HTTP_CODE_RESUME_INCOMPLETE = 308;
74 // The HTTP response code "412 Precondition Failed".
75 constexpr uint64 HTTP_CODE_PRECONDITION_FAILED = 412;
76 // The environment variable that overrides the size of the readahead buffer.
77 ABSL_DEPRECATED("Use GCS_READ_CACHE_BLOCK_SIZE_MB instead.")
78 constexpr char kReadaheadBufferSize[] = "GCS_READAHEAD_BUFFER_SIZE_BYTES";
79 // The environment variable that overrides the maximum age of entries in the
80 // Stat cache. A value of 0 means nothing is cached.
81 constexpr char kStatCacheMaxAge[] = "GCS_STAT_CACHE_MAX_AGE";
82 constexpr uint64 kStatCacheDefaultMaxAge = 5;
83 // The environment variable that overrides the maximum number of entries in the
84 // Stat cache.
85 constexpr char kStatCacheMaxEntries[] = "GCS_STAT_CACHE_MAX_ENTRIES";
86 constexpr size_t kStatCacheDefaultMaxEntries = 1024;
87 // The environment variable that overrides the maximum age of entries in the
88 // GetMatchingPaths cache. A value of 0 (the default) means nothing is cached.
89 constexpr char kMatchingPathsCacheMaxAge[] = "GCS_MATCHING_PATHS_CACHE_MAX_AGE";
90 constexpr uint64 kMatchingPathsCacheDefaultMaxAge = 0;
91 // The environment variable that overrides the maximum number of entries in the
92 // GetMatchingPaths cache.
93 constexpr char kMatchingPathsCacheMaxEntries[] =
94     "GCS_MATCHING_PATHS_CACHE_MAX_ENTRIES";
95 constexpr size_t kMatchingPathsCacheDefaultMaxEntries = 1024;
96 // Number of bucket locations cached, most workloads wont touch more than one
97 // bucket so this limit is set fairly low
98 constexpr size_t kBucketLocationCacheMaxEntries = 10;
99 // ExpiringLRUCache doesnt support any "cache forever" option
100 constexpr size_t kCacheNeverExpire = std::numeric_limits<uint64>::max();
101 // The file statistics returned by Stat() for directories.
102 const FileStatistics DIRECTORY_STAT(0, 0, true);
103 // Some environments exhibit unreliable DNS resolution. Set this environment
104 // variable to a positive integer describing the frequency used to refresh the
105 // userspace DNS cache.
106 constexpr char kResolveCacheSecs[] = "GCS_RESOLVE_REFRESH_SECS";
107 // The environment variable to configure the http request's connection timeout.
108 constexpr char kRequestConnectionTimeout[] =
109     "GCS_REQUEST_CONNECTION_TIMEOUT_SECS";
110 // The environment variable to configure the http request's idle timeout.
111 constexpr char kRequestIdleTimeout[] = "GCS_REQUEST_IDLE_TIMEOUT_SECS";
112 // The environment variable to configure the overall request timeout for
113 // metadata requests.
114 constexpr char kMetadataRequestTimeout[] = "GCS_METADATA_REQUEST_TIMEOUT_SECS";
115 // The environment variable to configure the overall request timeout for
116 // block reads requests.
117 constexpr char kReadRequestTimeout[] = "GCS_READ_REQUEST_TIMEOUT_SECS";
118 // The environment variable to configure the overall request timeout for
119 // upload requests.
120 constexpr char kWriteRequestTimeout[] = "GCS_WRITE_REQUEST_TIMEOUT_SECS";
121 // The environment variable to configure an additional header to send with
122 // all requests to GCS (format HEADERNAME:HEADERCONTENT)
123 constexpr char kAdditionalRequestHeader[] = "GCS_ADDITIONAL_REQUEST_HEADER";
124 // The environment variable to configure the throttle (format: <int64_t>)
125 constexpr char kThrottleRate[] = "GCS_THROTTLE_TOKEN_RATE";
126 // The environment variable to configure the token bucket size (format:
127 // <int64_t>)
128 constexpr char kThrottleBucket[] = "GCS_THROTTLE_BUCKET_SIZE";
129 // The environment variable that controls the number of tokens per request.
130 // (format: <int64_t>)
131 constexpr char kTokensPerRequest[] = "GCS_TOKENS_PER_REQUEST";
132 // The environment variable to configure the initial tokens (format: <int64_t>)
133 constexpr char kInitialTokens[] = "GCS_INITIAL_TOKENS";
134 
135 // The environment variable to customize which GCS bucket locations are allowed,
136 // if the list is empty defaults to using the region of the zone (format, comma
137 // delimited list). Requires 'storage.buckets.get' permission.
138 constexpr char kAllowedBucketLocations[] = "GCS_ALLOWED_BUCKET_LOCATIONS";
139 // When this value is passed as an allowed location detects the zone tensorflow
140 // is running in and restricts to buckets in that region.
141 constexpr char kDetectZoneSentinelValue[] = "auto";
142 
143 // How to upload new data when Flush() is called multiple times.
144 // By default the entire file is reuploaded.
145 constexpr char kAppendMode[] = "GCS_APPEND_MODE";
146 // If GCS_APPEND_MODE=compose then instead the new data is uploaded to a
147 // temporary object and composed with the original object. This is disabled by
148 // default as the multiple API calls required add a risk of stranding temporary
149 // objects.
150 constexpr char kComposeAppend[] = "compose";
151 
GetTmpFilename(string * filename)152 Status GetTmpFilename(string* filename) {
153   *filename = io::GetTempFilename("");
154   return OkStatus();
155 }
156 
157 /// Appends a trailing slash if the name doesn't already have one.
MaybeAppendSlash(const string & name)158 string MaybeAppendSlash(const string& name) {
159   if (name.empty()) {
160     return "/";
161   }
162   if (name.back() != '/') {
163     return strings::StrCat(name, "/");
164   }
165   return name;
166 }
167 
168 // io::JoinPath() doesn't work in cases when we want an empty subpath
169 // to result in an appended slash in order for directory markers
170 // to be processed correctly: "gs://a/b" + "" should give "gs://a/b/".
JoinGcsPath(const string & path,const string & subpath)171 string JoinGcsPath(const string& path, const string& subpath) {
172   return strings::StrCat(MaybeAppendSlash(path), subpath);
173 }
174 
175 /// \brief Returns the given paths appending all their subfolders.
176 ///
177 /// For every path X in the list, every subfolder in X is added to the
178 /// resulting list.
179 /// For example:
180 ///  - for 'a/b/c/d' it will append 'a', 'a/b' and 'a/b/c'
181 ///  - for 'a/b/c/' it will append 'a', 'a/b' and 'a/b/c'
182 ///  - for 'a//b/c/' it will append 'a', 'a//b' and 'a//b/c'
183 ///  - for '/a/b/c/' it will append '/a', '/a/b' and '/a/b/c'
AddAllSubpaths(const std::vector<string> & paths)184 std::set<string> AddAllSubpaths(const std::vector<string>& paths) {
185   std::set<string> result;
186   result.insert(paths.begin(), paths.end());
187   for (const string& path : paths) {
188     StringPiece subpath = io::Dirname(path);
189     // If `path` starts with `/`, `subpath` will be `/` and then we get into an
190     // infinite loop. Same behavior happens if there is a `//` pattern in
191     // `path`, so we check for that and leave the loop quicker.
192     while (!(subpath.empty() || subpath == "/")) {
193       result.emplace(string(subpath));
194       subpath = io::Dirname(subpath);
195     }
196   }
197   return result;
198 }
199 
ParseJson(StringPiece json,Json::Value * result)200 Status ParseJson(StringPiece json, Json::Value* result) {
201   Json::Reader reader;
202   if (!reader.parse(json.data(), json.data() + json.size(), *result)) {
203     return errors::Internal("Couldn't parse JSON response from GCS.");
204   }
205   return OkStatus();
206 }
207 
ParseJson(const std::vector<char> & json,Json::Value * result)208 Status ParseJson(const std::vector<char>& json, Json::Value* result) {
209   return ParseJson(StringPiece{json.data(), json.size()}, result);
210 }
211 
212 /// Reads a JSON value with the given name from a parent JSON value.
GetValue(const Json::Value & parent,const char * name,Json::Value * result)213 Status GetValue(const Json::Value& parent, const char* name,
214                 Json::Value* result) {
215   *result = parent.get(name, Json::Value::null);
216   if (result->isNull()) {
217     return errors::Internal("The field '", name,
218                             "' was expected in the JSON response.");
219   }
220   return OkStatus();
221 }
222 
223 /// Reads a string JSON value with the given name from a parent JSON value.
GetStringValue(const Json::Value & parent,const char * name,string * result)224 Status GetStringValue(const Json::Value& parent, const char* name,
225                       string* result) {
226   Json::Value result_value;
227   TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
228   if (!result_value.isString()) {
229     return errors::Internal(
230         "The field '", name,
231         "' in the JSON response was expected to be a string.");
232   }
233   *result = result_value.asString();
234   return OkStatus();
235 }
236 
237 /// Reads a long JSON value with the given name from a parent JSON value.
GetInt64Value(const Json::Value & parent,const char * name,int64_t * result)238 Status GetInt64Value(const Json::Value& parent, const char* name,
239                      int64_t* result) {
240   Json::Value result_value;
241   TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
242   if (result_value.isNumeric()) {
243     *result = result_value.asInt64();
244     return OkStatus();
245   }
246   if (result_value.isString() &&
247       strings::safe_strto64(result_value.asCString(), result)) {
248     return OkStatus();
249   }
250   return errors::Internal(
251       "The field '", name,
252       "' in the JSON response was expected to be a number.");
253 }
254 
255 /// Reads a boolean JSON value with the given name from a parent JSON value.
GetBoolValue(const Json::Value & parent,const char * name,bool * result)256 Status GetBoolValue(const Json::Value& parent, const char* name, bool* result) {
257   Json::Value result_value;
258   TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
259   if (!result_value.isBool()) {
260     return errors::Internal(
261         "The field '", name,
262         "' in the JSON response was expected to be a boolean.");
263   }
264   *result = result_value.asBool();
265   return OkStatus();
266 }
267 
268 /// A GCS-based implementation of a random access file with an LRU block cache.
269 class GcsRandomAccessFile : public RandomAccessFile {
270  public:
271   using ReadFn =
272       std::function<Status(const string& filename, uint64 offset, size_t n,
273                            StringPiece* result, char* scratch)>;
274 
GcsRandomAccessFile(const string & filename,ReadFn read_fn)275   GcsRandomAccessFile(const string& filename, ReadFn read_fn)
276       : filename_(filename), read_fn_(std::move(read_fn)) {}
277 
Name(StringPiece * result) const278   Status Name(StringPiece* result) const override {
279     *result = filename_;
280     return OkStatus();
281   }
282 
283   /// The implementation of reads with an LRU block cache. Thread safe.
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const284   Status Read(uint64 offset, size_t n, StringPiece* result,
285               char* scratch) const override {
286     return read_fn_(filename_, offset, n, result, scratch);
287   }
288 
289  private:
290   /// The filename of this file.
291   const string filename_;
292   /// The implementation of the read operation (provided by the GCSFileSystem).
293   const ReadFn read_fn_;
294 };
295 
296 /// A GCS-based implementation of a random access file with a read buffer.
297 class BufferedGcsRandomAccessFile : public RandomAccessFile {
298  public:
299   using ReadFn =
300       std::function<Status(const string& filename, uint64 offset, size_t n,
301                            StringPiece* result, char* scratch)>;
302 
303   // Initialize the reader. Provided read_fn should be thread safe.
BufferedGcsRandomAccessFile(const string & filename,uint64 buffer_size,ReadFn read_fn)304   BufferedGcsRandomAccessFile(const string& filename, uint64 buffer_size,
305                               ReadFn read_fn)
306       : filename_(filename),
307         read_fn_(std::move(read_fn)),
308         buffer_size_(buffer_size),
309         buffer_start_(0),
310         buffer_end_is_past_eof_(false) {}
311 
Name(StringPiece * result) const312   Status Name(StringPiece* result) const override {
313     *result = filename_;
314     return OkStatus();
315   }
316 
317   /// The implementation of reads with an read buffer. Thread safe.
318   /// Returns `OUT_OF_RANGE` if fewer than n bytes were stored in `*result`
319   /// because of EOF.
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const320   Status Read(uint64 offset, size_t n, StringPiece* result,
321               char* scratch) const override {
322     if (n > buffer_size_) {
323       return read_fn_(filename_, offset, n, result, scratch);
324     }
325     {
326       mutex_lock l(buffer_mutex_);
327       size_t buffer_end = buffer_start_ + buffer_.size();
328       size_t copy_size = 0;
329       if (offset < buffer_end && offset >= buffer_start_) {
330         copy_size = std::min(n, static_cast<size_t>(buffer_end - offset));
331         memcpy(scratch, buffer_.data() + (offset - buffer_start_), copy_size);
332         *result = StringPiece(scratch, copy_size);
333       }
334       bool consumed_buffer_to_eof =
335           offset + copy_size >= buffer_end && buffer_end_is_past_eof_;
336       if (copy_size < n && !consumed_buffer_to_eof) {
337         Status status = FillBuffer(offset + copy_size);
338         if (!status.ok() && !errors::IsOutOfRange(status)) {
339           // Empty the buffer to avoid caching bad reads.
340           buffer_.resize(0);
341           return status;
342         }
343         size_t remaining_copy = std::min(n - copy_size, buffer_.size());
344         memcpy(scratch + copy_size, buffer_.data(), remaining_copy);
345         copy_size += remaining_copy;
346         *result = StringPiece(scratch, copy_size);
347       }
348       if (copy_size < n) {
349         // Forget the end-of-file flag to allow for clients that poll on the
350         // same file.
351         buffer_end_is_past_eof_ = false;
352         return errors::OutOfRange("EOF reached. Requested to read ", n,
353                                   " bytes from ", offset, ".");
354       }
355     }
356     return OkStatus();
357   }
358 
359  private:
FillBuffer(uint64 start) const360   Status FillBuffer(uint64 start) const
361       TF_EXCLUSIVE_LOCKS_REQUIRED(buffer_mutex_) {
362     buffer_start_ = start;
363     buffer_.resize(buffer_size_);
364     StringPiece str_piece;
365     Status status = read_fn_(filename_, buffer_start_, buffer_size_, &str_piece,
366                              &(buffer_[0]));
367     buffer_end_is_past_eof_ = errors::IsOutOfRange(status);
368     buffer_.resize(str_piece.size());
369     return status;
370   }
371 
372   // The filename of this file.
373   const string filename_;
374 
375   // The implementation of the read operation (provided by the GCSFileSystem).
376   const ReadFn read_fn_;
377 
378   // Size of buffer that we read from GCS each time we send a request.
379   const uint64 buffer_size_;
380 
381   // Mutex for buffering operations that can be accessed from multiple threads.
382   // The following members are mutable in order to provide a const Read.
383   mutable mutex buffer_mutex_;
384 
385   // Offset of buffer from start of the file.
386   mutable uint64 buffer_start_ TF_GUARDED_BY(buffer_mutex_);
387 
388   mutable bool buffer_end_is_past_eof_ TF_GUARDED_BY(buffer_mutex_);
389 
390   mutable string buffer_ TF_GUARDED_BY(buffer_mutex_);
391 };
392 
393 // Function object declaration with params needed to create upload sessions.
394 typedef std::function<Status(
395     uint64 start_offset, const std::string& object_to_upload,
396     const std::string& bucket, uint64 file_size, const std::string& gcs_path,
397     UploadSessionHandle* session_handle)>
398     SessionCreator;
399 
400 // Function object declaration with params needed to upload objects.
401 typedef std::function<Status(const std::string& session_uri,
402                              uint64 start_offset, uint64 already_uploaded,
403                              const std::string& tmp_content_filename,
404                              uint64 file_size, const std::string& file_path)>
405     ObjectUploader;
406 
407 // Function object declaration with params needed to poll upload status.
408 typedef std::function<Status(const string& session_uri, uint64 file_size,
409                              const std::string& gcs_path, bool* completed,
410                              uint64* uploaded)>
411     StatusPoller;
412 
413 // Function object declaration with params needed to poll upload status.
414 typedef std::function<Status(const string& fname, const string& bucket,
415                              const string& object, int64_t* generation)>
416     GenerationGetter;
417 
418 /// \brief GCS-based implementation of a writeable file.
419 ///
420 /// Since GCS objects are immutable, this implementation writes to a local
421 /// tmp file and copies it to GCS on flush/close.
422 class GcsWritableFile : public WritableFile {
423  public:
GcsWritableFile(const string & bucket,const string & object,GcsFileSystem * filesystem,GcsFileSystem::TimeoutConfig * timeouts,std::function<void ()> file_cache_erase,RetryConfig retry_config,bool compose_append,SessionCreator session_creator,ObjectUploader object_uploader,StatusPoller status_poller,GenerationGetter generation_getter)424   GcsWritableFile(const string& bucket, const string& object,
425                   GcsFileSystem* filesystem,
426                   GcsFileSystem::TimeoutConfig* timeouts,
427                   std::function<void()> file_cache_erase,
428                   RetryConfig retry_config, bool compose_append,
429                   SessionCreator session_creator,
430                   ObjectUploader object_uploader, StatusPoller status_poller,
431                   GenerationGetter generation_getter)
432       : bucket_(bucket),
433         object_(object),
434         filesystem_(filesystem),
435         timeouts_(timeouts),
436         file_cache_erase_(std::move(file_cache_erase)),
437         sync_needed_(true),
438         retry_config_(retry_config),
439         compose_append_(compose_append),
440         start_offset_(0),
441         session_creator_(std::move(session_creator)),
442         object_uploader_(std::move(object_uploader)),
443         status_poller_(std::move(status_poller)),
444         generation_getter_(std::move(generation_getter)) {
445     // TODO: to make it safer, outfile_ should be constructed from an FD
446     VLOG(3) << "GcsWritableFile: " << GetGcsPath();
447     if (GetTmpFilename(&tmp_content_filename_).ok()) {
448       outfile_.open(tmp_content_filename_,
449                     std::ofstream::binary | std::ofstream::app);
450     }
451   }
452 
453   /// \brief Constructs the writable file in append mode.
454   ///
455   /// tmp_content_filename should contain a path of an existing temporary file
456   /// with the content to be appended. The class takes ownership of the
457   /// specified tmp file and deletes it on close.
GcsWritableFile(const string & bucket,const string & object,GcsFileSystem * filesystem,const string & tmp_content_filename,GcsFileSystem::TimeoutConfig * timeouts,std::function<void ()> file_cache_erase,RetryConfig retry_config,bool compose_append,SessionCreator session_creator,ObjectUploader object_uploader,StatusPoller status_poller,GenerationGetter generation_getter)458   GcsWritableFile(const string& bucket, const string& object,
459                   GcsFileSystem* filesystem, const string& tmp_content_filename,
460                   GcsFileSystem::TimeoutConfig* timeouts,
461                   std::function<void()> file_cache_erase,
462                   RetryConfig retry_config, bool compose_append,
463                   SessionCreator session_creator,
464                   ObjectUploader object_uploader, StatusPoller status_poller,
465                   GenerationGetter generation_getter)
466       : bucket_(bucket),
467         object_(object),
468         filesystem_(filesystem),
469         timeouts_(timeouts),
470         file_cache_erase_(std::move(file_cache_erase)),
471         sync_needed_(true),
472         retry_config_(retry_config),
473         compose_append_(compose_append),
474         start_offset_(0),
475         session_creator_(std::move(session_creator)),
476         object_uploader_(std::move(object_uploader)),
477         status_poller_(std::move(status_poller)),
478         generation_getter_(std::move(generation_getter)) {
479     VLOG(3) << "GcsWritableFile: " << GetGcsPath() << "with existing file "
480             << tmp_content_filename;
481     tmp_content_filename_ = tmp_content_filename;
482     outfile_.open(tmp_content_filename_,
483                   std::ofstream::binary | std::ofstream::app);
484   }
485 
~GcsWritableFile()486   ~GcsWritableFile() override {
487     Close().IgnoreError();
488     std::remove(tmp_content_filename_.c_str());
489   }
490 
Append(StringPiece data)491   Status Append(StringPiece data) override {
492     TF_RETURN_IF_ERROR(CheckWritable());
493     VLOG(3) << "Append: " << GetGcsPath() << " size " << data.length();
494     sync_needed_ = true;
495     outfile_ << data;
496     if (!outfile_.good()) {
497       return errors::Internal(
498           "Could not append to the internal temporary file.");
499     }
500     return OkStatus();
501   }
502 
Close()503   Status Close() override {
504     VLOG(3) << "Close:" << GetGcsPath();
505     if (outfile_.is_open()) {
506       Status sync_status = Sync();
507       if (sync_status.ok()) {
508         outfile_.close();
509       }
510       return sync_status;
511     }
512     return OkStatus();
513   }
514 
Flush()515   Status Flush() override {
516     VLOG(3) << "Flush:" << GetGcsPath();
517     return Sync();
518   }
519 
Name(StringPiece * result) const520   Status Name(StringPiece* result) const override {
521     return errors::Unimplemented("GCSWritableFile does not support Name()");
522   }
523 
Sync()524   Status Sync() override {
525     VLOG(3) << "Sync started:" << GetGcsPath();
526     TF_RETURN_IF_ERROR(CheckWritable());
527     if (!sync_needed_) {
528       return OkStatus();
529     }
530     Status status = SyncImpl();
531     VLOG(3) << "Sync finished " << GetGcsPath();
532     if (status.ok()) {
533       sync_needed_ = false;
534     }
535     return status;
536   }
537 
Tell(int64_t * position)538   Status Tell(int64_t* position) override {
539     *position = outfile_.tellp();
540     if (*position == -1) {
541       return errors::Internal("tellp on the internal temporary file failed");
542     }
543     return OkStatus();
544   }
545 
546  private:
547   /// Copies the current version of the file to GCS.
548   ///
549   /// This SyncImpl() uploads the object to GCS.
550   /// In case of a failure, it resumes failed uploads as recommended by the GCS
551   /// resumable API documentation. When the whole upload needs to be
552   /// restarted, Sync() returns UNAVAILABLE and relies on RetryingFileSystem.
SyncImpl()553   Status SyncImpl() {
554     outfile_.flush();
555     if (!outfile_.good()) {
556       return errors::Internal(
557           "Could not write to the internal temporary file.");
558     }
559     UploadSessionHandle session_handle;
560     uint64 start_offset = 0;
561     string object_to_upload = object_;
562     bool should_compose = false;
563     if (compose_append_) {
564       start_offset = start_offset_;
565       // Only compose if the object has already been uploaded to GCS
566       should_compose = start_offset > 0;
567       if (should_compose) {
568         object_to_upload =
569             strings::StrCat(io::Dirname(object_), "/.tmpcompose/",
570                             io::Basename(object_), ".", start_offset_);
571       }
572     }
573     TF_RETURN_IF_ERROR(CreateNewUploadSession(start_offset, object_to_upload,
574                                               &session_handle));
575     uint64 already_uploaded = 0;
576     bool first_attempt = true;
577     const Status upload_status = RetryingUtils::CallWithRetries(
578         [&first_attempt, &already_uploaded, &session_handle, &start_offset,
579          this]() {
580           if (session_handle.resumable && !first_attempt) {
581             bool completed;
582             TF_RETURN_IF_ERROR(RequestUploadSessionStatus(
583                 session_handle.session_uri, &completed, &already_uploaded));
584             LOG(INFO) << "### RequestUploadSessionStatus: completed = "
585                       << completed
586                       << ", already_uploaded = " << already_uploaded
587                       << ", file = " << GetGcsPath();
588             if (completed) {
589               // Erase the file from the file cache on every successful write.
590               file_cache_erase_();
591               // It's unclear why UploadToSession didn't return OK in the
592               // previous attempt, but GCS reports that the file is fully
593               // uploaded, so succeed.
594               return OkStatus();
595             }
596           }
597           first_attempt = false;
598           return UploadToSession(session_handle.session_uri, start_offset,
599                                  already_uploaded);
600         },
601         retry_config_);
602     if (errors::IsNotFound(upload_status)) {
603       // GCS docs recommend retrying the whole upload. We're relying on the
604       // RetryingFileSystem to retry the Sync() call.
605       return errors::Unavailable(strings::StrCat(
606           "Upload to gs://", bucket_, "/", object_,
607           " failed, caused by: ", upload_status.error_message()));
608     }
609     if (upload_status.ok()) {
610       if (should_compose) {
611         TF_RETURN_IF_ERROR(AppendObject(object_to_upload));
612       }
613       TF_RETURN_IF_ERROR(GetCurrentFileSize(&start_offset_));
614     }
615     return upload_status;
616   }
617 
CheckWritable() const618   Status CheckWritable() const {
619     if (!outfile_.is_open()) {
620       return errors::FailedPrecondition(
621           "The internal temporary file is not writable.");
622     }
623     return OkStatus();
624   }
625 
GetCurrentFileSize(uint64 * size)626   Status GetCurrentFileSize(uint64* size) {
627     const auto tellp = outfile_.tellp();
628     if (tellp == static_cast<std::streampos>(-1)) {
629       return errors::Internal(
630           "Could not get the size of the internal temporary file.");
631     }
632     *size = tellp;
633     return OkStatus();
634   }
635 
636   /// Initiates a new resumable upload session.
CreateNewUploadSession(uint64 start_offset,std::string object_to_upload,UploadSessionHandle * session_handle)637   Status CreateNewUploadSession(uint64 start_offset,
638                                 std::string object_to_upload,
639                                 UploadSessionHandle* session_handle) {
640     uint64 file_size;
641     TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
642     return session_creator_(start_offset, object_to_upload, bucket_, file_size,
643                             GetGcsPath(), session_handle);
644   }
645 
646   /// Appends the data of append_object to the original object and deletes
647   /// append_object.
AppendObject(string append_object)648   Status AppendObject(string append_object) {
649     const string append_object_path = GetGcsPathWithObject(append_object);
650     VLOG(3) << "AppendObject: " << append_object_path << " to " << GetGcsPath();
651 
652     int64_t generation = 0;
653     TF_RETURN_IF_ERROR(
654         generation_getter_(GetGcsPath(), bucket_, object_, &generation));
655 
656     TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
657         [&append_object, &generation, this]() {
658           std::unique_ptr<HttpRequest> request;
659           TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
660 
661           request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket_, "/o/",
662                                           request->EscapeString(object_),
663                                           "/compose"));
664 
665           const string request_body = strings::StrCat(
666               "{'sourceObjects': [{'name': '", object_,
667               "','objectPrecondition':{'ifGenerationMatch':", generation,
668               "}},{'name': '", append_object, "'}]}");
669           request->SetTimeouts(timeouts_->connect, timeouts_->idle,
670                                timeouts_->metadata);
671           request->AddHeader("content-type", "application/json");
672           request->SetPostFromBuffer(request_body.c_str(), request_body.size());
673           TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
674                                           " when composing to ", GetGcsPath());
675           return OkStatus();
676         },
677         retry_config_));
678 
679     return RetryingUtils::DeleteWithRetries(
680         [&append_object_path, this]() {
681           return filesystem_->DeleteFile(append_object_path, nullptr);
682         },
683         retry_config_);
684   }
685 
686   /// \brief Requests status of a previously initiated upload session.
687   ///
688   /// If the upload has already succeeded, sets 'completed' to true.
689   /// Otherwise sets 'completed' to false and 'uploaded' to the currently
690   /// uploaded size in bytes.
RequestUploadSessionStatus(const string & session_uri,bool * completed,uint64 * uploaded)691   Status RequestUploadSessionStatus(const string& session_uri, bool* completed,
692                                     uint64* uploaded) {
693     uint64 file_size;
694     TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
695     return status_poller_(session_uri, file_size, GetGcsPath(), completed,
696                           uploaded);
697   }
698 
699   /// Uploads data to object.
UploadToSession(const string & session_uri,uint64 start_offset,uint64 already_uploaded)700   Status UploadToSession(const string& session_uri, uint64 start_offset,
701                          uint64 already_uploaded) {
702     uint64 file_size;
703     TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
704     Status status =
705         object_uploader_(session_uri, start_offset, already_uploaded,
706                          tmp_content_filename_, file_size, GetGcsPath());
707     if (status.ok()) {
708       // Erase the file from the file cache on every successful write.
709       // Note: Only local cache, this does nothing on distributed cache. The
710       // distributed cache clears the cache as it is needed.
711       file_cache_erase_();
712     }
713 
714     return status;
715   }
716 
GetGcsPathWithObject(string object) const717   string GetGcsPathWithObject(string object) const {
718     return strings::StrCat("gs://", bucket_, "/", object);
719   }
GetGcsPath() const720   string GetGcsPath() const { return GetGcsPathWithObject(object_); }
721 
722   string bucket_;
723   string object_;
724   GcsFileSystem* const filesystem_;  // Not owned.
725   string tmp_content_filename_;
726   std::ofstream outfile_;
727   GcsFileSystem::TimeoutConfig* timeouts_;
728   std::function<void()> file_cache_erase_;
729   bool sync_needed_;  // whether there is buffered data that needs to be synced
730   RetryConfig retry_config_;
731   bool compose_append_;
732   uint64 start_offset_;
733   // Callbacks to the file system used to upload object into GCS.
734   const SessionCreator session_creator_;
735   const ObjectUploader object_uploader_;
736   const StatusPoller status_poller_;
737   const GenerationGetter generation_getter_;
738 };
739 
740 class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
741  public:
GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data,uint64 length)742   GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data, uint64 length)
743       : data_(std::move(data)), length_(length) {}
data()744   const void* data() override { return reinterpret_cast<void*>(data_.get()); }
length()745   uint64 length() override { return length_; }
746 
747  private:
748   std::unique_ptr<char[]> data_;
749   uint64 length_;
750 };
751 
StringPieceIdentity(StringPiece str,StringPiece * value)752 bool StringPieceIdentity(StringPiece str, StringPiece* value) {
753   *value = str;
754   return true;
755 }
756 
757 /// \brief Utility function to split a comma delimited list of strings to an
758 /// unordered set, lowercasing all values.
SplitByCommaToLowercaseSet(StringPiece list,std::unordered_set<string> * set)759 bool SplitByCommaToLowercaseSet(StringPiece list,
760                                 std::unordered_set<string>* set) {
761   std::vector<string> vector = absl::StrSplit(absl::AsciiStrToLower(list), ',');
762   *set = std::unordered_set<string>(vector.begin(), vector.end());
763   return true;
764 }
765 
766 // \brief Convert Compute Engine zone to region
ZoneToRegion(string * zone)767 string ZoneToRegion(string* zone) {
768   return zone->substr(0, zone->find_last_of('-'));
769 }
770 
771 }  // namespace
772 
GcsFileSystem(bool make_default_cache)773 GcsFileSystem::GcsFileSystem(bool make_default_cache) {
774   uint64 value;
775   block_size_ = kDefaultBlockSize;
776   size_t max_bytes = kDefaultMaxCacheSize;
777 
778   uint64 max_staleness = kDefaultMaxStaleness;
779 
780   http_request_factory_ = std::make_shared<CurlHttpRequest::Factory>();
781   compute_engine_metadata_client_ =
782       std::make_shared<ComputeEngineMetadataClient>(http_request_factory_);
783   auth_provider_ = std::unique_ptr<AuthProvider>(
784       new GoogleAuthProvider(compute_engine_metadata_client_));
785   zone_provider_ = std::unique_ptr<ZoneProvider>(
786       new ComputeEngineZoneProvider(compute_engine_metadata_client_));
787 
788   // Apply the sys env override for the readahead buffer size if it's provided.
789   if (GetEnvVar(kReadaheadBufferSize, strings::safe_strtou64, &value)) {
790     block_size_ = value;
791   }
792 
793   // Apply the overrides for the block size (MB), max bytes (MB), and max
794   // staleness (seconds) if provided.
795   if (GetEnvVar(kBlockSize, strings::safe_strtou64, &value)) {
796     block_size_ = value * 1024 * 1024;
797   }
798 
799   if (GetEnvVar(kMaxCacheSize, strings::safe_strtou64, &value)) {
800     max_bytes = value * 1024 * 1024;
801   }
802 
803   if (GetEnvVar(kMaxStaleness, strings::safe_strtou64, &value)) {
804     max_staleness = value;
805   }
806   if (!make_default_cache) {
807     max_bytes = 0;
808   }
809   VLOG(1) << "GCS cache max size = " << max_bytes << " ; "
810           << "block size = " << block_size_ << " ; "
811           << "max staleness = " << max_staleness;
812   file_block_cache_ = MakeFileBlockCache(block_size_, max_bytes, max_staleness);
813   // Apply overrides for the stat cache max age and max entries, if provided.
814   uint64 stat_cache_max_age = kStatCacheDefaultMaxAge;
815   size_t stat_cache_max_entries = kStatCacheDefaultMaxEntries;
816   if (GetEnvVar(kStatCacheMaxAge, strings::safe_strtou64, &value)) {
817     stat_cache_max_age = value;
818   }
819   if (GetEnvVar(kStatCacheMaxEntries, strings::safe_strtou64, &value)) {
820     stat_cache_max_entries = value;
821   }
822   stat_cache_.reset(new ExpiringLRUCache<GcsFileStat>(stat_cache_max_age,
823                                                       stat_cache_max_entries));
824   // Apply overrides for the matching paths cache max age and max entries, if
825   // provided.
826   uint64 matching_paths_cache_max_age = kMatchingPathsCacheDefaultMaxAge;
827   size_t matching_paths_cache_max_entries =
828       kMatchingPathsCacheDefaultMaxEntries;
829   if (GetEnvVar(kMatchingPathsCacheMaxAge, strings::safe_strtou64, &value)) {
830     matching_paths_cache_max_age = value;
831   }
832   if (GetEnvVar(kMatchingPathsCacheMaxEntries, strings::safe_strtou64,
833                 &value)) {
834     matching_paths_cache_max_entries = value;
835   }
836   matching_paths_cache_.reset(new ExpiringLRUCache<std::vector<string>>(
837       matching_paths_cache_max_age, matching_paths_cache_max_entries));
838 
839   bucket_location_cache_.reset(new ExpiringLRUCache<string>(
840       kCacheNeverExpire, kBucketLocationCacheMaxEntries));
841 
842   int64_t resolve_frequency_secs;
843   if (GetEnvVar(kResolveCacheSecs, strings::safe_strto64,
844                 &resolve_frequency_secs)) {
845     dns_cache_.reset(new GcsDnsCache(resolve_frequency_secs));
846     VLOG(1) << "GCS DNS cache is enabled.  " << kResolveCacheSecs << " = "
847             << resolve_frequency_secs;
848   } else {
849     VLOG(1) << "GCS DNS cache is disabled, because " << kResolveCacheSecs
850             << " = 0 (or is not set)";
851   }
852 
853   // Get the additional header
854   StringPiece add_header_contents;
855   if (GetEnvVar(kAdditionalRequestHeader, StringPieceIdentity,
856                 &add_header_contents)) {
857     size_t split = add_header_contents.find(':', 0);
858 
859     if (split != StringPiece::npos) {
860       StringPiece header_name = add_header_contents.substr(0, split);
861       StringPiece header_value = add_header_contents.substr(split + 1);
862 
863       if (!header_name.empty() && !header_value.empty()) {
864         additional_header_.reset(new std::pair<const string, const string>(
865             string(header_name), string(header_value)));
866 
867         VLOG(1) << "GCS additional header ENABLED. "
868                 << "Name: " << additional_header_->first << ", "
869                 << "Value: " << additional_header_->second;
870       } else {
871         LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
872                    << add_header_contents;
873       }
874     } else {
875       LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
876                  << add_header_contents;
877     }
878   } else {
879     VLOG(1) << "GCS additional header DISABLED. No environment variable set.";
880   }
881 
882   // Apply the overrides for request timeouts
883   uint32 timeout_value;
884   if (GetEnvVar(kRequestConnectionTimeout, strings::safe_strtou32,
885                 &timeout_value)) {
886     timeouts_.connect = timeout_value;
887   }
888   if (GetEnvVar(kRequestIdleTimeout, strings::safe_strtou32, &timeout_value)) {
889     timeouts_.idle = timeout_value;
890   }
891   if (GetEnvVar(kMetadataRequestTimeout, strings::safe_strtou32,
892                 &timeout_value)) {
893     timeouts_.metadata = timeout_value;
894   }
895   if (GetEnvVar(kReadRequestTimeout, strings::safe_strtou32, &timeout_value)) {
896     timeouts_.read = timeout_value;
897   }
898   if (GetEnvVar(kWriteRequestTimeout, strings::safe_strtou32, &timeout_value)) {
899     timeouts_.write = timeout_value;
900   }
901 
902   int64_t token_value;
903   if (GetEnvVar(kThrottleRate, strings::safe_strto64, &token_value)) {
904     GcsThrottleConfig config;
905     config.enabled = true;
906     config.token_rate = token_value;
907 
908     if (GetEnvVar(kThrottleBucket, strings::safe_strto64, &token_value)) {
909       config.bucket_size = token_value;
910     }
911 
912     if (GetEnvVar(kTokensPerRequest, strings::safe_strto64, &token_value)) {
913       config.tokens_per_request = token_value;
914     }
915 
916     if (GetEnvVar(kInitialTokens, strings::safe_strto64, &token_value)) {
917       config.initial_tokens = token_value;
918     }
919     throttle_.SetConfig(config);
920   }
921 
922   GetEnvVar(kAllowedBucketLocations, SplitByCommaToLowercaseSet,
923             &allowed_locations_);
924 
925   StringPiece append_mode;
926   GetEnvVar(kAppendMode, StringPieceIdentity, &append_mode);
927   if (append_mode == kComposeAppend) {
928     compose_append_ = true;
929   } else {
930     compose_append_ = false;
931   }
932 }
933 
GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,std::unique_ptr<HttpRequest::Factory> http_request_factory,std::unique_ptr<ZoneProvider> zone_provider,size_t block_size,size_t max_bytes,uint64 max_staleness,uint64 stat_cache_max_age,size_t stat_cache_max_entries,uint64 matching_paths_cache_max_age,size_t matching_paths_cache_max_entries,RetryConfig retry_config,TimeoutConfig timeouts,const std::unordered_set<string> & allowed_locations,std::pair<const string,const string> * additional_header,bool compose_append)934 GcsFileSystem::GcsFileSystem(
935     std::unique_ptr<AuthProvider> auth_provider,
936     std::unique_ptr<HttpRequest::Factory> http_request_factory,
937     std::unique_ptr<ZoneProvider> zone_provider, size_t block_size,
938     size_t max_bytes, uint64 max_staleness, uint64 stat_cache_max_age,
939     size_t stat_cache_max_entries, uint64 matching_paths_cache_max_age,
940     size_t matching_paths_cache_max_entries, RetryConfig retry_config,
941     TimeoutConfig timeouts, const std::unordered_set<string>& allowed_locations,
942     std::pair<const string, const string>* additional_header,
943     bool compose_append)
944     : timeouts_(timeouts),
945       retry_config_(retry_config),
946       auth_provider_(std::move(auth_provider)),
947       http_request_factory_(std::move(http_request_factory)),
948       zone_provider_(std::move(zone_provider)),
949       block_size_(block_size),
950       file_block_cache_(
951           MakeFileBlockCache(block_size, max_bytes, max_staleness)),
952       stat_cache_(new StatCache(stat_cache_max_age, stat_cache_max_entries)),
953       matching_paths_cache_(new MatchingPathsCache(
954           matching_paths_cache_max_age, matching_paths_cache_max_entries)),
955       bucket_location_cache_(new BucketLocationCache(
956           kCacheNeverExpire, kBucketLocationCacheMaxEntries)),
957       allowed_locations_(allowed_locations),
958       compose_append_(compose_append),
959       additional_header_(additional_header) {}
960 
NewRandomAccessFile(const string & fname,TransactionToken * token,std::unique_ptr<RandomAccessFile> * result)961 Status GcsFileSystem::NewRandomAccessFile(
962     const string& fname, TransactionToken* token,
963     std::unique_ptr<RandomAccessFile>* result) {
964   string bucket, object;
965   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
966   TF_RETURN_IF_ERROR(CheckBucketLocationConstraint(bucket));
967   if (cache_enabled_) {
968     result->reset(new GcsRandomAccessFile(fname, [this, bucket, object](
969                                                      const string& fname,
970                                                      uint64 offset, size_t n,
971                                                      StringPiece* result,
972                                                      char* scratch) {
973       tf_shared_lock l(block_cache_lock_);
974       GcsFileStat stat;
975       TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(
976           fname, &stat,
977           [this, bucket, object](const string& fname, GcsFileStat* stat) {
978             return UncachedStatForObject(fname, bucket, object, stat);
979           }));
980       if (!file_block_cache_->ValidateAndUpdateFileSignature(
981               fname, stat.generation_number)) {
982         VLOG(1)
983             << "File signature has been changed. Refreshing the cache. Path: "
984             << fname;
985       }
986       *result = StringPiece();
987       size_t bytes_transferred;
988       TF_RETURN_IF_ERROR(file_block_cache_->Read(fname, offset, n, scratch,
989                                                  &bytes_transferred));
990       *result = StringPiece(scratch, bytes_transferred);
991       if (bytes_transferred < n) {
992         return errors::OutOfRange("EOF reached, ", result->size(),
993                                   " bytes were read out of ", n,
994                                   " bytes requested.");
995       }
996       return OkStatus();
997     }));
998   } else {
999     result->reset(new BufferedGcsRandomAccessFile(
1000         fname, block_size_,
1001         [this, bucket, object](const string& fname, uint64 offset, size_t n,
1002                                StringPiece* result, char* scratch) {
1003           *result = StringPiece();
1004           size_t bytes_transferred;
1005           TF_RETURN_IF_ERROR(
1006               LoadBufferFromGCS(fname, offset, n, scratch, &bytes_transferred));
1007           *result = StringPiece(scratch, bytes_transferred);
1008           if (bytes_transferred < n) {
1009             return errors::OutOfRange("EOF reached, ", result->size(),
1010                                       " bytes were read out of ", n,
1011                                       " bytes requested.");
1012           }
1013           return OkStatus();
1014         }));
1015   }
1016   return OkStatus();
1017 }
1018 
ResetFileBlockCache(size_t block_size_bytes,size_t max_bytes,uint64 max_staleness_secs)1019 void GcsFileSystem::ResetFileBlockCache(size_t block_size_bytes,
1020                                         size_t max_bytes,
1021                                         uint64 max_staleness_secs) {
1022   mutex_lock l(block_cache_lock_);
1023   file_block_cache_ =
1024       MakeFileBlockCache(block_size_bytes, max_bytes, max_staleness_secs);
1025   if (stats_ != nullptr) {
1026     stats_->Configure(this, &throttle_, file_block_cache_.get());
1027   }
1028 }
1029 
1030 // A helper function to build a FileBlockCache for GcsFileSystem.
MakeFileBlockCache(size_t block_size,size_t max_bytes,uint64 max_staleness)1031 std::unique_ptr<FileBlockCache> GcsFileSystem::MakeFileBlockCache(
1032     size_t block_size, size_t max_bytes, uint64 max_staleness) {
1033   std::unique_ptr<FileBlockCache> file_block_cache(new RamFileBlockCache(
1034       block_size, max_bytes, max_staleness,
1035       [this](const string& filename, size_t offset, size_t n, char* buffer,
1036              size_t* bytes_transferred) {
1037         return LoadBufferFromGCS(filename, offset, n, buffer,
1038                                  bytes_transferred);
1039       }));
1040 
1041   // Check if cache is enabled here to avoid unnecessary mutex contention.
1042   cache_enabled_ = file_block_cache->IsCacheEnabled();
1043   return file_block_cache;
1044 }
1045 
1046 // A helper function to actually read the data from GCS.
LoadBufferFromGCS(const string & fname,size_t offset,size_t n,char * buffer,size_t * bytes_transferred)1047 Status GcsFileSystem::LoadBufferFromGCS(const string& fname, size_t offset,
1048                                         size_t n, char* buffer,
1049                                         size_t* bytes_transferred) {
1050   *bytes_transferred = 0;
1051 
1052   string bucket, object;
1053   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1054 
1055   profiler::TraceMe activity(
1056       [fname]() { return absl::StrCat("LoadBufferFromGCS ", fname); });
1057 
1058   std::unique_ptr<HttpRequest> request;
1059   TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
1060                                   "when reading gs://", bucket, "/", object);
1061 
1062   request->SetUri(strings::StrCat("https://", kStorageHost, "/", bucket, "/",
1063                                   request->EscapeString(object)));
1064   request->SetRange(offset, offset + n - 1);
1065   request->SetResultBufferDirect(buffer, n);
1066   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.read);
1067 
1068   if (stats_ != nullptr) {
1069     stats_->RecordBlockLoadRequest(fname, offset);
1070   }
1071 
1072   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://",
1073                                   bucket, "/", object);
1074 
1075   size_t bytes_read = request->GetResultBufferDirectBytesTransferred();
1076   *bytes_transferred = bytes_read;
1077   VLOG(1) << "Successful read of gs://" << bucket << "/" << object << " @ "
1078           << offset << " of size: " << bytes_read;
1079   activity.AppendMetadata([bytes_read]() {
1080     return profiler::TraceMeEncode({{"block_size", bytes_read}});
1081   });
1082 
1083   if (stats_ != nullptr) {
1084     stats_->RecordBlockRetrieved(fname, offset, bytes_read);
1085   }
1086 
1087   throttle_.RecordResponse(bytes_read);
1088 
1089   if (bytes_read < n) {
1090     // Check stat cache to see if we encountered an interrupted read.
1091     GcsFileStat stat;
1092     if (stat_cache_->Lookup(fname, &stat)) {
1093       if (offset + bytes_read < stat.base.length) {
1094         return errors::Internal(strings::Printf(
1095             "File contents are inconsistent for file: %s @ %lu.", fname.c_str(),
1096             offset));
1097       }
1098       VLOG(2) << "Successful integrity check for: gs://" << bucket << "/"
1099               << object << " @ " << offset;
1100     }
1101   }
1102 
1103   return OkStatus();
1104 }
1105 
1106 /// Initiates a new upload session.
CreateNewUploadSession(uint64 start_offset,const std::string & object_to_upload,const std::string & bucket,uint64 file_size,const std::string & gcs_path,UploadSessionHandle * session_handle)1107 Status GcsFileSystem::CreateNewUploadSession(
1108     uint64 start_offset, const std::string& object_to_upload,
1109     const std::string& bucket, uint64 file_size, const std::string& gcs_path,
1110     UploadSessionHandle* session_handle) {
1111   std::vector<char> output_buffer;
1112   std::unique_ptr<HttpRequest> request;
1113   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1114 
1115   std::string uri = strings::StrCat(
1116       kGcsUploadUriBase, "b/", bucket,
1117       "/o?uploadType=resumable&name=", request->EscapeString(object_to_upload));
1118   request->SetUri(uri);
1119   request->AddHeader("X-Upload-Content-Length",
1120                      absl::StrCat(file_size - start_offset));
1121   request->SetPostEmptyBody();
1122   request->SetResultBuffer(&output_buffer);
1123   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1124   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
1125                                   " when initiating an upload to ", gcs_path);
1126   if (session_handle != nullptr) {
1127     session_handle->resumable = true;
1128     session_handle->session_uri = request->GetResponseHeader("Location");
1129     if (session_handle->session_uri.empty()) {
1130       return errors::Internal("Unexpected response from GCS when writing to ",
1131                               gcs_path, ": 'Location' header not returned.");
1132     }
1133   }
1134   return OkStatus();
1135 }
1136 
UploadToSession(const std::string & session_uri,uint64 start_offset,uint64 already_uploaded,const std::string & tmp_content_filename,uint64 file_size,const std::string & file_path)1137 Status GcsFileSystem::UploadToSession(const std::string& session_uri,
1138                                       uint64 start_offset,
1139                                       uint64 already_uploaded,
1140                                       const std::string& tmp_content_filename,
1141                                       uint64 file_size,
1142                                       const std::string& file_path) {
1143   std::unique_ptr<HttpRequest> request;
1144   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1145   request->SetUri(session_uri);
1146   if (file_size > 0) {
1147     request->AddHeader("Content-Range",
1148                        strings::StrCat("bytes ", already_uploaded, "-",
1149                                        file_size - start_offset - 1, "/",
1150                                        file_size - start_offset));
1151   }
1152   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.write);
1153 
1154   TF_RETURN_IF_ERROR(request->SetPutFromFile(tmp_content_filename,
1155                                              start_offset + already_uploaded));
1156   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ",
1157                                   file_path);
1158   return OkStatus();
1159 }
1160 
RequestUploadSessionStatus(const string & session_uri,uint64 file_size,const std::string & gcs_path,bool * completed,uint64 * uploaded)1161 Status GcsFileSystem::RequestUploadSessionStatus(const string& session_uri,
1162                                                  uint64 file_size,
1163                                                  const std::string& gcs_path,
1164                                                  bool* completed,
1165                                                  uint64* uploaded) {
1166   CHECK(completed != nullptr) << "RequestUploadSessionStatus() called with out "
1167                                  "param 'completed' == nullptr.";  // Crash ok
1168   CHECK(uploaded != nullptr) << "RequestUploadSessionStatus() called with out "
1169                                 "param 'uploaded' == nullptr.";  // Crash ok
1170   std::unique_ptr<HttpRequest> request;
1171   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1172   request->SetUri(session_uri);
1173   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1174   request->AddHeader("Content-Range", strings::StrCat("bytes */", file_size));
1175   request->SetPutEmptyBody();
1176   Status status = request->Send();
1177   if (status.ok()) {
1178     *completed = true;
1179     return OkStatus();
1180   }
1181   *completed = false;
1182   if (request->GetResponseCode() != HTTP_CODE_RESUME_INCOMPLETE) {
1183     TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when resuming upload ", gcs_path);
1184   }
1185   const std::string received_range = request->GetResponseHeader("Range");
1186   if (received_range.empty()) {
1187     // This means GCS doesn't have any bytes of the file yet.
1188     *uploaded = 0;
1189   } else {
1190     StringPiece range_piece(received_range);
1191     absl::ConsumePrefix(&range_piece,
1192                         "bytes=");  // May or may not be present.
1193 
1194     auto return_error = [](const std::string& gcs_path,
1195                            const std::string& error_message) {
1196       return errors::Internal("Unexpected response from GCS when writing ",
1197                               gcs_path, ": ", error_message);
1198     };
1199 
1200     std::vector<string> range_strs = str_util::Split(range_piece, '-');
1201     if (range_strs.size() != 2) {
1202       return return_error(gcs_path, "Range header '" + received_range +
1203                                         "' could not be parsed.");
1204     }
1205 
1206     std::vector<int64_t> range_parts;
1207     for (const std::string& range_str : range_strs) {
1208       int64_t tmp;
1209       if (strings::safe_strto64(range_str, &tmp)) {
1210         range_parts.push_back(tmp);
1211       } else {
1212         return return_error(gcs_path, "Range header '" + received_range +
1213                                           "' could not be parsed.");
1214       }
1215     }
1216 
1217     if (range_parts[0] != 0) {
1218       return return_error(gcs_path, "The returned range '" + received_range +
1219                                         "' does not start at zero.");
1220     }
1221     // If GCS returned "Range: 0-10", this means 11 bytes were uploaded.
1222     *uploaded = range_parts[1] + 1;
1223   }
1224   return OkStatus();
1225 }
1226 
ParseGcsPathForScheme(StringPiece fname,string scheme,bool empty_object_ok,string * bucket,string * object)1227 Status GcsFileSystem::ParseGcsPathForScheme(StringPiece fname, string scheme,
1228                                             bool empty_object_ok,
1229                                             string* bucket, string* object) {
1230   StringPiece parsed_scheme, bucketp, objectp;
1231   io::ParseURI(fname, &parsed_scheme, &bucketp, &objectp);
1232   if (parsed_scheme != scheme) {
1233     return errors::InvalidArgument("GCS path doesn't start with 'gs://': ",
1234                                    fname);
1235   }
1236   *bucket = string(bucketp);
1237   if (bucket->empty() || *bucket == ".") {
1238     return errors::InvalidArgument("GCS path doesn't contain a bucket name: ",
1239                                    fname);
1240   }
1241   absl::ConsumePrefix(&objectp, "/");
1242   *object = string(objectp);
1243   if (!empty_object_ok && object->empty()) {
1244     return errors::InvalidArgument("GCS path doesn't contain an object name: ",
1245                                    fname);
1246   }
1247   return OkStatus();
1248 }
1249 
ParseGcsPath(StringPiece fname,bool empty_object_ok,string * bucket,string * object)1250 Status GcsFileSystem::ParseGcsPath(StringPiece fname, bool empty_object_ok,
1251                                    string* bucket, string* object) {
1252   return ParseGcsPathForScheme(fname, "gs", empty_object_ok, bucket, object);
1253 }
1254 
ClearFileCaches(const string & fname)1255 void GcsFileSystem::ClearFileCaches(const string& fname) {
1256   tf_shared_lock l(block_cache_lock_);
1257   file_block_cache_->RemoveFile(fname);
1258   stat_cache_->Delete(fname);
1259   // TODO(rxsang): Remove the patterns that matche the file in
1260   // MatchingPathsCache as well.
1261 }
1262 
NewWritableFile(const string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)1263 Status GcsFileSystem::NewWritableFile(const string& fname,
1264                                       TransactionToken* token,
1265                                       std::unique_ptr<WritableFile>* result) {
1266   string bucket, object;
1267   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1268 
1269   auto session_creator =
1270       [this](uint64 start_offset, const std::string& object_to_upload,
1271              const std::string& bucket, uint64 file_size,
1272              const std::string& gcs_path, UploadSessionHandle* session_handle) {
1273         return CreateNewUploadSession(start_offset, object_to_upload, bucket,
1274                                       file_size, gcs_path, session_handle);
1275       };
1276   auto object_uploader =
1277       [this](const std::string& session_uri, uint64 start_offset,
1278              uint64 already_uploaded, const std::string& tmp_content_filename,
1279              uint64 file_size, const std::string& file_path) {
1280         return UploadToSession(session_uri, start_offset, already_uploaded,
1281                                tmp_content_filename, file_size, file_path);
1282       };
1283   auto status_poller = [this](const string& session_uri, uint64 file_size,
1284                               const std::string& gcs_path, bool* completed,
1285                               uint64* uploaded) {
1286     return RequestUploadSessionStatus(session_uri, file_size, gcs_path,
1287                                       completed, uploaded);
1288   };
1289 
1290   auto generation_getter = [this](const string& fname, const string& bucket,
1291                                   const string& object, int64* generation) {
1292     GcsFileStat stat;
1293     TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
1294         [&fname, &bucket, &object, &stat, this]() {
1295           return UncachedStatForObject(fname, bucket, object, &stat);
1296         },
1297         retry_config_));
1298     *generation = stat.generation_number;
1299     return OkStatus();
1300   };
1301 
1302   result->reset(new GcsWritableFile(
1303       bucket, object, this, &timeouts_,
1304       [this, fname]() { ClearFileCaches(fname); }, retry_config_,
1305       compose_append_, session_creator, object_uploader, status_poller,
1306       generation_getter));
1307   return OkStatus();
1308 }
1309 
1310 // Reads the file from GCS in chunks and stores it in a tmp file,
1311 // which is then passed to GcsWritableFile.
NewAppendableFile(const string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)1312 Status GcsFileSystem::NewAppendableFile(const string& fname,
1313                                         TransactionToken* token,
1314                                         std::unique_ptr<WritableFile>* result) {
1315   std::unique_ptr<RandomAccessFile> reader;
1316   TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &reader));
1317   std::unique_ptr<char[]> buffer(new char[kReadAppendableFileBufferSize]);
1318   Status status;
1319   uint64 offset = 0;
1320   StringPiece read_chunk;
1321 
1322   // Read the file from GCS in chunks and save it to a tmp file.
1323   string old_content_filename;
1324   TF_RETURN_IF_ERROR(GetTmpFilename(&old_content_filename));
1325   std::ofstream old_content(old_content_filename, std::ofstream::binary);
1326   while (true) {
1327     status = reader->Read(offset, kReadAppendableFileBufferSize, &read_chunk,
1328                           buffer.get());
1329     if (status.ok()) {
1330       old_content << read_chunk;
1331       offset += kReadAppendableFileBufferSize;
1332     } else if (status.code() == error::NOT_FOUND) {
1333       // New file, there is no existing content in it.
1334       break;
1335     } else if (status.code() == error::OUT_OF_RANGE) {
1336       // Expected, this means we reached EOF.
1337       old_content << read_chunk;
1338       break;
1339     } else {
1340       return status;
1341     }
1342   }
1343   old_content.close();
1344 
1345   auto session_creator =
1346       [this](uint64 start_offset, const std::string& object_to_upload,
1347              const std::string& bucket, uint64 file_size,
1348              const std::string& gcs_path, UploadSessionHandle* session_handle) {
1349         return CreateNewUploadSession(start_offset, object_to_upload, bucket,
1350                                       file_size, gcs_path, session_handle);
1351       };
1352   auto object_uploader =
1353       [this](const std::string& session_uri, uint64 start_offset,
1354              uint64 already_uploaded, const std::string& tmp_content_filename,
1355              uint64 file_size, const std::string& file_path) {
1356         return UploadToSession(session_uri, start_offset, already_uploaded,
1357                                tmp_content_filename, file_size, file_path);
1358       };
1359 
1360   auto status_poller = [this](const string& session_uri, uint64 file_size,
1361                               const std::string& gcs_path, bool* completed,
1362                               uint64* uploaded) {
1363     return RequestUploadSessionStatus(session_uri, file_size, gcs_path,
1364                                       completed, uploaded);
1365   };
1366 
1367   auto generation_getter = [this](const string& fname, const string& bucket,
1368                                   const string& object, int64* generation) {
1369     GcsFileStat stat;
1370     TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
1371         [&fname, &bucket, &object, &stat, this]() {
1372           return UncachedStatForObject(fname, bucket, object, &stat);
1373         },
1374         retry_config_));
1375     *generation = stat.generation_number;
1376     return OkStatus();
1377   };
1378 
1379   // Create a writable file and pass the old content to it.
1380   string bucket, object;
1381   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1382   result->reset(new GcsWritableFile(
1383       bucket, object, this, old_content_filename, &timeouts_,
1384       [this, fname]() { ClearFileCaches(fname); }, retry_config_,
1385       compose_append_, session_creator, object_uploader, status_poller,
1386       generation_getter));
1387   return OkStatus();
1388 }
1389 
NewReadOnlyMemoryRegionFromFile(const string & fname,TransactionToken * token,std::unique_ptr<ReadOnlyMemoryRegion> * result)1390 Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile(
1391     const string& fname, TransactionToken* token,
1392     std::unique_ptr<ReadOnlyMemoryRegion>* result) {
1393   uint64 size;
1394   TF_RETURN_IF_ERROR(GetFileSize(fname, token, &size));
1395   std::unique_ptr<char[]> data(new char[size]);
1396 
1397   std::unique_ptr<RandomAccessFile> file;
1398   TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &file));
1399 
1400   StringPiece piece;
1401   TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
1402 
1403   result->reset(new GcsReadOnlyMemoryRegion(std::move(data), size));
1404   return OkStatus();
1405 }
1406 
FileExists(const string & fname,TransactionToken * token)1407 Status GcsFileSystem::FileExists(const string& fname, TransactionToken* token) {
1408   string bucket, object;
1409   TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1410   if (object.empty()) {
1411     bool result;
1412     TF_RETURN_IF_ERROR(BucketExists(bucket, &result));
1413     if (result) {
1414       return OkStatus();
1415     }
1416   }
1417 
1418   // Check if the object exists.
1419   GcsFileStat stat;
1420   const Status status = StatForObject(fname, bucket, object, &stat);
1421   if (!errors::IsNotFound(status)) {
1422     return status;
1423   }
1424 
1425   // Check if the folder exists.
1426   bool result;
1427   TF_RETURN_IF_ERROR(FolderExists(fname, &result));
1428   if (result) {
1429     return OkStatus();
1430   }
1431   return errors::NotFound("The specified path ", fname, " was not found.");
1432 }
1433 
ObjectExists(const string & fname,const string & bucket,const string & object,bool * result)1434 Status GcsFileSystem::ObjectExists(const string& fname, const string& bucket,
1435                                    const string& object, bool* result) {
1436   GcsFileStat stat;
1437   const Status status = StatForObject(fname, bucket, object, &stat);
1438   switch (static_cast<int>(status.code())) {
1439     case static_cast<int>(error::Code::OK):
1440       *result = !stat.base.is_directory;
1441       return OkStatus();
1442     case static_cast<int>(error::Code::NOT_FOUND):
1443       *result = false;
1444       return OkStatus();
1445     default:
1446       return status;
1447   }
1448 }
1449 
UncachedStatForObject(const string & fname,const string & bucket,const string & object,GcsFileStat * stat)1450 Status GcsFileSystem::UncachedStatForObject(const string& fname,
1451                                             const string& bucket,
1452                                             const string& object,
1453                                             GcsFileStat* stat) {
1454   std::vector<char> output_buffer;
1455   std::unique_ptr<HttpRequest> request;
1456   TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
1457                                   " when reading metadata of gs://", bucket,
1458                                   "/", object);
1459 
1460   request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
1461                                   request->EscapeString(object),
1462                                   "?fields=size%2Cgeneration%2Cupdated"));
1463   request->SetResultBuffer(&output_buffer);
1464   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1465 
1466   if (stats_ != nullptr) {
1467     stats_->RecordStatObjectRequest();
1468   }
1469 
1470   TF_RETURN_WITH_CONTEXT_IF_ERROR(
1471       request->Send(), " when reading metadata of gs://", bucket, "/", object);
1472 
1473   Json::Value root;
1474   TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1475 
1476   // Parse file size.
1477   TF_RETURN_IF_ERROR(GetInt64Value(root, "size", &stat->base.length));
1478 
1479   // Parse generation number.
1480   TF_RETURN_IF_ERROR(
1481       GetInt64Value(root, "generation", &stat->generation_number));
1482 
1483   // Parse file modification time.
1484   string updated;
1485   TF_RETURN_IF_ERROR(GetStringValue(root, "updated", &updated));
1486   TF_RETURN_IF_ERROR(ParseRfc3339Time(updated, &(stat->base.mtime_nsec)));
1487 
1488   VLOG(1) << "Stat of: gs://" << bucket << "/" << object << " -- "
1489           << " length: " << stat->base.length
1490           << " generation: " << stat->generation_number
1491           << "; mtime_nsec: " << stat->base.mtime_nsec
1492           << "; updated: " << updated;
1493 
1494   if (str_util::EndsWith(fname, "/")) {
1495     // In GCS a path can be both a directory and a file, both it is uncommon for
1496     // other file systems. To avoid the ambiguity, if a path ends with "/" in
1497     // GCS, we always regard it as a directory mark or a virtual directory.
1498     stat->base.is_directory = true;
1499   } else {
1500     stat->base.is_directory = false;
1501   }
1502   return OkStatus();
1503 }
1504 
StatForObject(const string & fname,const string & bucket,const string & object,GcsFileStat * stat)1505 Status GcsFileSystem::StatForObject(const string& fname, const string& bucket,
1506                                     const string& object, GcsFileStat* stat) {
1507   if (object.empty()) {
1508     return errors::InvalidArgument(strings::Printf(
1509         "'object' must be a non-empty string. (File: %s)", fname.c_str()));
1510   }
1511 
1512   TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(
1513       fname, stat,
1514       [this, &bucket, &object](const string& fname, GcsFileStat* stat) {
1515         return UncachedStatForObject(fname, bucket, object, stat);
1516       }));
1517   return OkStatus();
1518 }
1519 
BucketExists(const string & bucket,bool * result)1520 Status GcsFileSystem::BucketExists(const string& bucket, bool* result) {
1521   const Status status = GetBucketMetadata(bucket, nullptr);
1522   switch (status.code()) {
1523     case errors::Code::OK:
1524       *result = true;
1525       return OkStatus();
1526     case errors::Code::NOT_FOUND:
1527       *result = false;
1528       return OkStatus();
1529     default:
1530       return status;
1531   }
1532 }
1533 
CheckBucketLocationConstraint(const string & bucket)1534 Status GcsFileSystem::CheckBucketLocationConstraint(const string& bucket) {
1535   if (allowed_locations_.empty()) {
1536     return OkStatus();
1537   }
1538 
1539   // Avoid calling external API's in the constructor
1540   if (allowed_locations_.erase(kDetectZoneSentinelValue) == 1) {
1541     string zone;
1542     TF_RETURN_IF_ERROR(zone_provider_->GetZone(&zone));
1543     allowed_locations_.insert(ZoneToRegion(&zone));
1544   }
1545 
1546   string location;
1547   TF_RETURN_IF_ERROR(GetBucketLocation(bucket, &location));
1548   if (allowed_locations_.find(location) != allowed_locations_.end()) {
1549     return OkStatus();
1550   }
1551 
1552   return errors::FailedPrecondition(strings::Printf(
1553       "Bucket '%s' is in '%s' location, allowed locations are: (%s).",
1554       bucket.c_str(), location.c_str(),
1555       absl::StrJoin(allowed_locations_, ", ").c_str()));
1556 }
1557 
GetBucketLocation(const string & bucket,string * location)1558 Status GcsFileSystem::GetBucketLocation(const string& bucket,
1559                                         string* location) {
1560   auto compute_func = [this](const string& bucket, string* location) {
1561     std::vector<char> result_buffer;
1562     Status status = GetBucketMetadata(bucket, &result_buffer);
1563     Json::Value result;
1564     TF_RETURN_IF_ERROR(ParseJson(result_buffer, &result));
1565     string bucket_location;
1566     TF_RETURN_IF_ERROR(
1567         GetStringValue(result, kBucketMetadataLocationKey, &bucket_location));
1568     // Lowercase the GCS location to be case insensitive for allowed locations.
1569     *location = absl::AsciiStrToLower(bucket_location);
1570     return OkStatus();
1571   };
1572 
1573   TF_RETURN_IF_ERROR(
1574       bucket_location_cache_->LookupOrCompute(bucket, location, compute_func));
1575 
1576   return OkStatus();
1577 }
1578 
GetBucketMetadata(const string & bucket,std::vector<char> * result_buffer)1579 Status GcsFileSystem::GetBucketMetadata(const string& bucket,
1580                                         std::vector<char>* result_buffer) {
1581   std::unique_ptr<HttpRequest> request;
1582   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1583   request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket));
1584 
1585   if (result_buffer != nullptr) {
1586     request->SetResultBuffer(result_buffer);
1587   }
1588 
1589   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1590   return request->Send();
1591 }
1592 
FolderExists(const string & dirname,bool * result)1593 Status GcsFileSystem::FolderExists(const string& dirname, bool* result) {
1594   StatCache::ComputeFunc compute_func = [this](const string& dirname,
1595                                                GcsFileStat* stat) {
1596     std::vector<string> children;
1597     TF_RETURN_IF_ERROR(
1598         GetChildrenBounded(dirname, 1, &children, true /* recursively */,
1599                            true /* include_self_directory_marker */));
1600     if (!children.empty()) {
1601       stat->base = DIRECTORY_STAT;
1602       return OkStatus();
1603     } else {
1604       return errors::InvalidArgument("Not a directory!");
1605     }
1606   };
1607   GcsFileStat stat;
1608   Status s = stat_cache_->LookupOrCompute(MaybeAppendSlash(dirname), &stat,
1609                                           compute_func);
1610   if (s.ok()) {
1611     *result = stat.base.is_directory;
1612     return OkStatus();
1613   }
1614   if (errors::IsInvalidArgument(s)) {
1615     *result = false;
1616     return OkStatus();
1617   }
1618   return s;
1619 }
1620 
GetChildren(const string & dirname,TransactionToken * token,std::vector<string> * result)1621 Status GcsFileSystem::GetChildren(const string& dirname,
1622                                   TransactionToken* token,
1623                                   std::vector<string>* result) {
1624   return GetChildrenBounded(dirname, UINT64_MAX, result,
1625                             false /* recursively */,
1626                             false /* include_self_directory_marker */);
1627 }
1628 
GetMatchingPaths(const string & pattern,TransactionToken * token,std::vector<string> * results)1629 Status GcsFileSystem::GetMatchingPaths(const string& pattern,
1630                                        TransactionToken* token,
1631                                        std::vector<string>* results) {
1632   MatchingPathsCache::ComputeFunc compute_func =
1633       [this](const string& pattern, std::vector<string>* results) {
1634         results->clear();
1635         // Find the fixed prefix by looking for the first wildcard.
1636         const string& fixed_prefix =
1637             pattern.substr(0, pattern.find_first_of("*?[\\"));
1638         const string dir(this->Dirname(fixed_prefix));
1639         if (dir.empty()) {
1640           return errors::InvalidArgument(
1641               "A GCS pattern doesn't have a bucket name: ", pattern);
1642         }
1643         std::vector<string> all_files;
1644         TF_RETURN_IF_ERROR(GetChildrenBounded(
1645             dir, UINT64_MAX, &all_files, true /* recursively */,
1646             false /* include_self_directory_marker */));
1647 
1648         const auto& files_and_folders = AddAllSubpaths(all_files);
1649 
1650         // To handle `/` in the object names, we need to remove it from `dir`
1651         // and then use `StrCat` to insert it back.
1652         const StringPiece dir_no_slash = str_util::StripSuffix(dir, "/");
1653 
1654         // Match all obtained paths to the input pattern.
1655         for (const auto& path : files_and_folders) {
1656           // Manually construct the path instead of using `JoinPath` for the
1657           // cases where `path` starts with a `/` (which is a valid character in
1658           // the filenames of GCS objects). `JoinPath` canonicalizes the result,
1659           // removing duplicate slashes. We know that `dir_no_slash` does not
1660           // end in `/`, so we are safe inserting the new `/` here as the path
1661           // separator.
1662           const string full_path = strings::StrCat(dir_no_slash, "/", path);
1663           if (this->Match(full_path, pattern)) {
1664             results->push_back(full_path);
1665           }
1666         }
1667         return OkStatus();
1668       };
1669   TF_RETURN_IF_ERROR(
1670       matching_paths_cache_->LookupOrCompute(pattern, results, compute_func));
1671   return OkStatus();
1672 }
1673 
GetChildrenBounded(const string & dirname,uint64 max_results,std::vector<string> * result,bool recursive,bool include_self_directory_marker)1674 Status GcsFileSystem::GetChildrenBounded(const string& dirname,
1675                                          uint64 max_results,
1676                                          std::vector<string>* result,
1677                                          bool recursive,
1678                                          bool include_self_directory_marker) {
1679   if (!result) {
1680     return errors::InvalidArgument("'result' cannot be null");
1681   }
1682   string bucket, object_prefix;
1683   TF_RETURN_IF_ERROR(
1684       ParseGcsPath(MaybeAppendSlash(dirname), true, &bucket, &object_prefix));
1685 
1686   string nextPageToken;
1687   uint64 retrieved_results = 0;
1688   while (true) {  // A loop over multiple result pages.
1689     std::vector<char> output_buffer;
1690     std::unique_ptr<HttpRequest> request;
1691     TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1692     auto uri = strings::StrCat(kGcsUriBase, "b/", bucket, "/o");
1693     if (recursive) {
1694       uri = strings::StrCat(uri, "?fields=items%2Fname%2CnextPageToken");
1695     } else {
1696       // Set "/" as a delimiter to ask GCS to treat subfolders as children
1697       // and return them in "prefixes".
1698       uri = strings::StrCat(uri,
1699                             "?fields=items%2Fname%2Cprefixes%2CnextPageToken");
1700       uri = strings::StrCat(uri, "&delimiter=%2F");
1701     }
1702     if (!object_prefix.empty()) {
1703       uri = strings::StrCat(uri,
1704                             "&prefix=", request->EscapeString(object_prefix));
1705     }
1706     if (!nextPageToken.empty()) {
1707       uri = strings::StrCat(
1708           uri, "&pageToken=", request->EscapeString(nextPageToken));
1709     }
1710     if (max_results - retrieved_results < kGetChildrenDefaultPageSize) {
1711       uri =
1712           strings::StrCat(uri, "&maxResults=", max_results - retrieved_results);
1713     }
1714     request->SetUri(uri);
1715     request->SetResultBuffer(&output_buffer);
1716     request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1717 
1718     TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading ", dirname);
1719     Json::Value root;
1720     TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1721     const auto items = root.get("items", Json::Value::null);
1722     if (!items.isNull()) {
1723       if (!items.isArray()) {
1724         return errors::Internal(
1725             "Expected an array 'items' in the GCS response.");
1726       }
1727       for (size_t i = 0; i < items.size(); i++) {
1728         const auto item = items.get(i, Json::Value::null);
1729         if (!item.isObject()) {
1730           return errors::Internal(
1731               "Unexpected JSON format: 'items' should be a list of objects.");
1732         }
1733         string name;
1734         TF_RETURN_IF_ERROR(GetStringValue(item, "name", &name));
1735         // The names should be relative to the 'dirname'. That means the
1736         // 'object_prefix', which is part of 'dirname', should be removed from
1737         // the beginning of 'name'.
1738         StringPiece relative_path(name);
1739         if (!absl::ConsumePrefix(&relative_path, object_prefix)) {
1740           return errors::Internal(strings::StrCat(
1741               "Unexpected response: the returned file name ", name,
1742               " doesn't match the prefix ", object_prefix));
1743         }
1744         if (!relative_path.empty() || include_self_directory_marker) {
1745           result->emplace_back(relative_path);
1746         }
1747         if (++retrieved_results >= max_results) {
1748           return OkStatus();
1749         }
1750       }
1751     }
1752     const auto prefixes = root.get("prefixes", Json::Value::null);
1753     if (!prefixes.isNull()) {
1754       // Subfolders are returned for the non-recursive mode.
1755       if (!prefixes.isArray()) {
1756         return errors::Internal(
1757             "'prefixes' was expected to be an array in the GCS response.");
1758       }
1759       for (size_t i = 0; i < prefixes.size(); i++) {
1760         const auto prefix = prefixes.get(i, Json::Value::null);
1761         if (prefix.isNull() || !prefix.isString()) {
1762           return errors::Internal(
1763               "'prefixes' was expected to be an array of strings in the GCS "
1764               "response.");
1765         }
1766         const string& prefix_str = prefix.asString();
1767         StringPiece relative_path(prefix_str);
1768         if (!absl::ConsumePrefix(&relative_path, object_prefix)) {
1769           return errors::Internal(
1770               "Unexpected response: the returned folder name ", prefix_str,
1771               " doesn't match the prefix ", object_prefix);
1772         }
1773         result->emplace_back(relative_path);
1774         if (++retrieved_results >= max_results) {
1775           return OkStatus();
1776         }
1777       }
1778     }
1779     const auto token = root.get("nextPageToken", Json::Value::null);
1780     if (token.isNull()) {
1781       return OkStatus();
1782     }
1783     if (!token.isString()) {
1784       return errors::Internal(
1785           "Unexpected response: nextPageToken is not a string");
1786     }
1787     nextPageToken = token.asString();
1788   }
1789 }
1790 
Stat(const string & fname,TransactionToken * token,FileStatistics * stat)1791 Status GcsFileSystem::Stat(const string& fname, TransactionToken* token,
1792                            FileStatistics* stat) {
1793   if (!stat) {
1794     return errors::Internal("'stat' cannot be nullptr.");
1795   }
1796   string bucket, object;
1797   TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1798   if (object.empty()) {
1799     bool is_bucket;
1800     TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1801     if (is_bucket) {
1802       *stat = DIRECTORY_STAT;
1803       return OkStatus();
1804     }
1805     return errors::NotFound("The specified bucket ", fname, " was not found.");
1806   }
1807 
1808   GcsFileStat gcs_stat;
1809   const Status status = StatForObject(fname, bucket, object, &gcs_stat);
1810   if (status.ok()) {
1811     *stat = gcs_stat.base;
1812     return OkStatus();
1813   }
1814   if (!errors::IsNotFound(status)) {
1815     return status;
1816   }
1817   bool is_folder;
1818   TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
1819   if (is_folder) {
1820     *stat = DIRECTORY_STAT;
1821     return OkStatus();
1822   }
1823   return errors::NotFound("The specified path ", fname, " was not found.");
1824 }
1825 
DeleteFile(const string & fname,TransactionToken * token)1826 Status GcsFileSystem::DeleteFile(const string& fname, TransactionToken* token) {
1827   string bucket, object;
1828   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1829 
1830   std::unique_ptr<HttpRequest> request;
1831   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1832   request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
1833                                   request->EscapeString(object)));
1834   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1835   request->SetDeleteRequest();
1836 
1837   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when deleting ", fname);
1838   ClearFileCaches(fname);
1839   return OkStatus();
1840 }
1841 
CreateDir(const string & dirname,TransactionToken * token)1842 Status GcsFileSystem::CreateDir(const string& dirname,
1843                                 TransactionToken* token) {
1844   string dirname_with_slash = MaybeAppendSlash(dirname);
1845   VLOG(3) << "CreateDir: creating directory with dirname: " << dirname
1846           << " and dirname_with_slash: " << dirname_with_slash;
1847   string bucket, object;
1848   TF_RETURN_IF_ERROR(ParseGcsPath(dirname_with_slash, /*empty_object_ok=*/true,
1849                                   &bucket, &object));
1850   if (object.empty()) {
1851     bool is_bucket;
1852     TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1853     return is_bucket ? OkStatus()
1854                      : errors::NotFound("The specified bucket ",
1855                                         dirname_with_slash, " was not found.");
1856   }
1857 
1858   if (FileExists(dirname_with_slash, token).ok()) {
1859     // Use the original name for a correct error here.
1860     VLOG(3) << "CreateDir: directory already exists, not uploading " << dirname;
1861     return errors::AlreadyExists(dirname);
1862   }
1863 
1864   std::unique_ptr<HttpRequest> request;
1865   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1866 
1867   request->SetUri(strings::StrCat(
1868       kGcsUploadUriBase, "b/", bucket,
1869       "/o?uploadType=media&name=", request->EscapeString(object),
1870       // Adding this parameter means HTTP_CODE_PRECONDITION_FAILED
1871       // will be returned if the object already exists, so avoid reuploading.
1872       "&ifGenerationMatch=0"));
1873 
1874   request->SetPostEmptyBody();
1875   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1876   const Status& status = request->Send();
1877   if (status.ok()) {
1878     VLOG(3) << "CreateDir: finished uploading directory " << dirname;
1879     return OkStatus();
1880   }
1881   if (request->GetResponseCode() != HTTP_CODE_PRECONDITION_FAILED) {
1882     TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when uploading ",
1883                                     dirname_with_slash);
1884   }
1885   VLOG(3) << "Ignoring directory already exists on object "
1886           << dirname_with_slash;
1887   return errors::AlreadyExists(dirname);
1888 }
1889 
1890 // Checks that the directory is empty (i.e no objects with this prefix exist).
1891 // Deletes the GCS directory marker if it exists.
DeleteDir(const string & dirname,TransactionToken * token)1892 Status GcsFileSystem::DeleteDir(const string& dirname,
1893                                 TransactionToken* token) {
1894   std::vector<string> children;
1895   // A directory is considered empty either if there are no matching objects
1896   // with the corresponding name prefix or if there is exactly one matching
1897   // object and it is the directory marker. Therefore we need to retrieve
1898   // at most two children for the prefix to detect if a directory is empty.
1899   TF_RETURN_IF_ERROR(
1900       GetChildrenBounded(dirname, 2, &children, true /* recursively */,
1901                          true /* include_self_directory_marker */));
1902 
1903   if (children.size() > 1 || (children.size() == 1 && !children[0].empty())) {
1904     return errors::FailedPrecondition("Cannot delete a non-empty directory.");
1905   }
1906   if (children.size() == 1 && children[0].empty()) {
1907     // This is the directory marker object. Delete it.
1908     return DeleteFile(MaybeAppendSlash(dirname), token);
1909   }
1910   return OkStatus();
1911 }
1912 
GetFileSize(const string & fname,TransactionToken * token,uint64 * file_size)1913 Status GcsFileSystem::GetFileSize(const string& fname, TransactionToken* token,
1914                                   uint64* file_size) {
1915   if (!file_size) {
1916     return errors::Internal("'file_size' cannot be nullptr.");
1917   }
1918 
1919   // Only validate the name.
1920   string bucket, object;
1921   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1922 
1923   FileStatistics stat;
1924   TF_RETURN_IF_ERROR(Stat(fname, token, &stat));
1925   *file_size = stat.length;
1926   return OkStatus();
1927 }
1928 
RenameFile(const string & src,const string & target,TransactionToken * token)1929 Status GcsFileSystem::RenameFile(const string& src, const string& target,
1930                                  TransactionToken* token) {
1931   if (!IsDirectory(src, token).ok()) {
1932     return RenameObject(src, target);
1933   }
1934   // Rename all individual objects in the directory one by one.
1935   std::vector<string> children;
1936   TF_RETURN_IF_ERROR(
1937       GetChildrenBounded(src, UINT64_MAX, &children, true /* recursively */,
1938                          true /* include_self_directory_marker */));
1939   for (const string& subpath : children) {
1940     TF_RETURN_IF_ERROR(
1941         RenameObject(JoinGcsPath(src, subpath), JoinGcsPath(target, subpath)));
1942   }
1943   return OkStatus();
1944 }
1945 
1946 // Uses a GCS API command to copy the object and then deletes the old one.
RenameObject(const string & src,const string & target)1947 Status GcsFileSystem::RenameObject(const string& src, const string& target) {
1948   VLOG(3) << "RenameObject: started gs://" << src << " to " << target;
1949   string src_bucket, src_object, target_bucket, target_object;
1950   TF_RETURN_IF_ERROR(ParseGcsPath(src, false, &src_bucket, &src_object));
1951   TF_RETURN_IF_ERROR(
1952       ParseGcsPath(target, false, &target_bucket, &target_object));
1953 
1954   std::unique_ptr<HttpRequest> request;
1955   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1956   request->SetUri(strings::StrCat(kGcsUriBase, "b/", src_bucket, "/o/",
1957                                   request->EscapeString(src_object),
1958                                   "/rewriteTo/b/", target_bucket, "/o/",
1959                                   request->EscapeString(target_object)));
1960   request->SetPostEmptyBody();
1961   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1962   std::vector<char> output_buffer;
1963   request->SetResultBuffer(&output_buffer);
1964   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when renaming ", src,
1965                                   " to ", target);
1966   // Flush the target from the caches.  The source will be flushed in the
1967   // DeleteFile call below.
1968   ClearFileCaches(target);
1969   Json::Value root;
1970   TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1971   bool done;
1972   TF_RETURN_IF_ERROR(GetBoolValue(root, "done", &done));
1973   if (!done) {
1974     // If GCS didn't complete rewrite in one call, this means that a large file
1975     // is being copied to a bucket with a different storage class or location,
1976     // which requires multiple rewrite calls.
1977     // TODO(surkov): implement multi-step rewrites.
1978     return errors::Unimplemented(
1979         "Couldn't rename ", src, " to ", target,
1980         ": moving large files between buckets with different "
1981         "locations or storage classes is not supported.");
1982   }
1983 
1984   VLOG(3) << "RenameObject: finished from: gs://" << src << " to " << target;
1985   // In case the delete API call failed, but the deletion actually happened
1986   // on the server side, we can't just retry the whole RenameFile operation
1987   // because the source object is already gone.
1988   return RetryingUtils::DeleteWithRetries(
1989       [this, &src]() { return DeleteFile(src, nullptr); }, retry_config_);
1990 }
1991 
IsDirectory(const string & fname,TransactionToken * token)1992 Status GcsFileSystem::IsDirectory(const string& fname,
1993                                   TransactionToken* token) {
1994   string bucket, object;
1995   TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1996   if (object.empty()) {
1997     bool is_bucket;
1998     TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1999     if (is_bucket) {
2000       return OkStatus();
2001     }
2002     return errors::NotFound("The specified bucket gs://", bucket,
2003                             " was not found.");
2004   }
2005   bool is_folder;
2006   TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
2007   if (is_folder) {
2008     return OkStatus();
2009   }
2010   bool is_object;
2011   TF_RETURN_IF_ERROR(ObjectExists(fname, bucket, object, &is_object));
2012   if (is_object) {
2013     return errors::FailedPrecondition("The specified path ", fname,
2014                                       " is not a directory.");
2015   }
2016   return errors::NotFound("The specified path ", fname, " was not found.");
2017 }
2018 
DeleteRecursively(const string & dirname,TransactionToken * token,int64_t * undeleted_files,int64_t * undeleted_dirs)2019 Status GcsFileSystem::DeleteRecursively(const string& dirname,
2020                                         TransactionToken* token,
2021                                         int64_t* undeleted_files,
2022                                         int64_t* undeleted_dirs) {
2023   if (!undeleted_files || !undeleted_dirs) {
2024     return errors::Internal(
2025         "'undeleted_files' and 'undeleted_dirs' cannot be nullptr.");
2026   }
2027   *undeleted_files = 0;
2028   *undeleted_dirs = 0;
2029   if (!IsDirectory(dirname, token).ok()) {
2030     *undeleted_dirs = 1;
2031     return Status(
2032         error::NOT_FOUND,
2033         strings::StrCat(dirname, " doesn't exist or not a directory."));
2034   }
2035   std::vector<string> all_objects;
2036   // Get all children in the directory recursively.
2037   TF_RETURN_IF_ERROR(GetChildrenBounded(
2038       dirname, UINT64_MAX, &all_objects, true /* recursively */,
2039       true /* include_self_directory_marker */));
2040   for (const string& object : all_objects) {
2041     const string& full_path = JoinGcsPath(dirname, object);
2042     // Delete all objects including directory markers for subfolders.
2043     // Since DeleteRecursively returns OK if individual file deletions fail,
2044     // and therefore RetryingFileSystem won't pay attention to the failures,
2045     // we need to make sure these failures are properly retried.
2046     const auto& delete_file_status = RetryingUtils::DeleteWithRetries(
2047         [this, &full_path, token]() { return DeleteFile(full_path, token); },
2048         retry_config_);
2049     if (!delete_file_status.ok()) {
2050       if (IsDirectory(full_path, token).ok()) {
2051         // The object is a directory marker.
2052         (*undeleted_dirs)++;
2053       } else {
2054         (*undeleted_files)++;
2055       }
2056     }
2057   }
2058   return OkStatus();
2059 }
2060 
2061 // Flushes all caches for filesystem metadata and file contents. Useful for
2062 // reclaiming memory once filesystem operations are done (e.g. model is loaded),
2063 // or for resetting the filesystem to a consistent state.
FlushCaches(TransactionToken * token)2064 void GcsFileSystem::FlushCaches(TransactionToken* token) {
2065   tf_shared_lock l(block_cache_lock_);
2066   file_block_cache_->Flush();
2067   stat_cache_->Clear();
2068   matching_paths_cache_->Clear();
2069   bucket_location_cache_->Clear();
2070 }
2071 
SetStats(GcsStatsInterface * stats)2072 void GcsFileSystem::SetStats(GcsStatsInterface* stats) {
2073   CHECK(stats_ == nullptr) << "SetStats() has already been called.";
2074   CHECK(stats != nullptr);
2075   mutex_lock l(block_cache_lock_);
2076   stats_ = stats;
2077   stats_->Configure(this, &throttle_, file_block_cache_.get());
2078 }
2079 
SetCacheStats(FileBlockCacheStatsInterface * cache_stats)2080 void GcsFileSystem::SetCacheStats(FileBlockCacheStatsInterface* cache_stats) {
2081   tf_shared_lock l(block_cache_lock_);
2082   if (file_block_cache_ == nullptr) {
2083     LOG(ERROR) << "Tried to set cache stats of non-initialized file block "
2084                   "cache object. This may result in not exporting the intended "
2085                   "monitoring data";
2086     return;
2087   }
2088   file_block_cache_->SetStats(cache_stats);
2089 }
2090 
SetAuthProvider(std::unique_ptr<AuthProvider> auth_provider)2091 void GcsFileSystem::SetAuthProvider(
2092     std::unique_ptr<AuthProvider> auth_provider) {
2093   mutex_lock l(mu_);
2094   auth_provider_ = std::move(auth_provider);
2095 }
2096 
2097 // Creates an HttpRequest and sets several parameters that are common to all
2098 // requests.  All code (in GcsFileSystem) that creates an HttpRequest should
2099 // go through this method, rather than directly using http_request_factory_.
CreateHttpRequest(std::unique_ptr<HttpRequest> * request)2100 Status GcsFileSystem::CreateHttpRequest(std::unique_ptr<HttpRequest>* request) {
2101   std::unique_ptr<HttpRequest> new_request{http_request_factory_->Create()};
2102   if (dns_cache_) {
2103     dns_cache_->AnnotateRequest(new_request.get());
2104   }
2105 
2106   string auth_token;
2107   {
2108     tf_shared_lock l(mu_);
2109     TF_RETURN_IF_ERROR(
2110         AuthProvider::GetToken(auth_provider_.get(), &auth_token));
2111   }
2112 
2113   new_request->AddAuthBearerHeader(auth_token);
2114 
2115   if (additional_header_) {
2116     new_request->AddHeader(additional_header_->first,
2117                            additional_header_->second);
2118   }
2119 
2120   if (stats_ != nullptr) {
2121     new_request->SetRequestStats(stats_->HttpStats());
2122   }
2123 
2124   if (!throttle_.AdmitRequest()) {
2125     return errors::Unavailable("Request throttled");
2126   }
2127 
2128   *request = std::move(new_request);
2129   return OkStatus();
2130 }
2131 
2132 }  // namespace tensorflow
2133 
2134 // The TPU_GCS_FS option sets a TPU-on-GCS optimized file system that allows
2135 // TPU pods to function more optimally. When TPU_GCS_FS is enabled then
2136 // gcs_file_system will not be registered as a file system since the
2137 // tpu_gcs_file_system is going to take over its responsibilities. The tpu file
2138 // system is a child of gcs file system with TPU-pod on GCS optimizations.
2139 // This option is set ON/OFF in the GCP TPU tensorflow config.
2140 // Initialize gcs_file_system
2141 REGISTER_LEGACY_FILE_SYSTEM("gs", ::tensorflow::RetryingGcsFileSystem);
2142