1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/platform/cloud/gcs_file_system.h"
17
18 #include <stdio.h>
19 #include <unistd.h>
20
21 #include <algorithm>
22 #include <cstdio>
23 #include <cstdlib>
24 #include <cstring>
25 #include <fstream>
26 #include <functional>
27 #include <string>
28 #include <utility>
29 #include <vector>
30
31 #include "tensorflow/core/platform/file_statistics.h"
32 #include "tensorflow/core/platform/strcat.h"
33 #ifdef _WIN32
34 #include <io.h> // for _mktemp
35 #endif
36 #include "absl/base/macros.h"
37 #include "json/json.h"
38 #include "tensorflow/core/lib/gtl/map_util.h"
39 #include "tensorflow/core/platform/cloud/curl_http_request.h"
40 #include "tensorflow/core/platform/cloud/file_block_cache.h"
41 #include "tensorflow/core/platform/cloud/google_auth_provider.h"
42 #include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
43 #include "tensorflow/core/platform/cloud/time_util.h"
44 #include "tensorflow/core/platform/env.h"
45 #include "tensorflow/core/platform/errors.h"
46 #include "tensorflow/core/platform/mutex.h"
47 #include "tensorflow/core/platform/numbers.h"
48 #include "tensorflow/core/platform/path.h"
49 #include "tensorflow/core/platform/protobuf.h"
50 #include "tensorflow/core/platform/retrying_utils.h"
51 #include "tensorflow/core/platform/str_util.h"
52 #include "tensorflow/core/platform/stringprintf.h"
53 #include "tensorflow/core/platform/thread_annotations.h"
54 #include "tensorflow/core/profiler/lib/traceme.h"
55
56 #ifdef _WIN32
57 #ifdef DeleteFile
58 #undef DeleteFile
59 #endif
60 #endif
61
62 namespace tensorflow {
63 namespace {
64
65 constexpr char kGcsUriBase[] = "https://www.googleapis.com/storage/v1/";
66 constexpr char kGcsUploadUriBase[] =
67 "https://www.googleapis.com/upload/storage/v1/";
68 constexpr char kStorageHost[] = "storage.googleapis.com";
69 constexpr char kBucketMetadataLocationKey[] = "location";
70 constexpr size_t kReadAppendableFileBufferSize = 1024 * 1024; // In bytes.
71 constexpr int kGetChildrenDefaultPageSize = 1000;
72 // The HTTP response code "308 Resume Incomplete".
73 constexpr uint64 HTTP_CODE_RESUME_INCOMPLETE = 308;
74 // The HTTP response code "412 Precondition Failed".
75 constexpr uint64 HTTP_CODE_PRECONDITION_FAILED = 412;
76 // The environment variable that overrides the size of the readahead buffer.
77 ABSL_DEPRECATED("Use GCS_READ_CACHE_BLOCK_SIZE_MB instead.")
78 constexpr char kReadaheadBufferSize[] = "GCS_READAHEAD_BUFFER_SIZE_BYTES";
79 // The environment variable that overrides the maximum age of entries in the
80 // Stat cache. A value of 0 means nothing is cached.
81 constexpr char kStatCacheMaxAge[] = "GCS_STAT_CACHE_MAX_AGE";
82 constexpr uint64 kStatCacheDefaultMaxAge = 5;
83 // The environment variable that overrides the maximum number of entries in the
84 // Stat cache.
85 constexpr char kStatCacheMaxEntries[] = "GCS_STAT_CACHE_MAX_ENTRIES";
86 constexpr size_t kStatCacheDefaultMaxEntries = 1024;
87 // The environment variable that overrides the maximum age of entries in the
88 // GetMatchingPaths cache. A value of 0 (the default) means nothing is cached.
89 constexpr char kMatchingPathsCacheMaxAge[] = "GCS_MATCHING_PATHS_CACHE_MAX_AGE";
90 constexpr uint64 kMatchingPathsCacheDefaultMaxAge = 0;
91 // The environment variable that overrides the maximum number of entries in the
92 // GetMatchingPaths cache.
93 constexpr char kMatchingPathsCacheMaxEntries[] =
94 "GCS_MATCHING_PATHS_CACHE_MAX_ENTRIES";
95 constexpr size_t kMatchingPathsCacheDefaultMaxEntries = 1024;
96 // Number of bucket locations cached, most workloads wont touch more than one
97 // bucket so this limit is set fairly low
98 constexpr size_t kBucketLocationCacheMaxEntries = 10;
99 // ExpiringLRUCache doesnt support any "cache forever" option
100 constexpr size_t kCacheNeverExpire = std::numeric_limits<uint64>::max();
101 // The file statistics returned by Stat() for directories.
102 const FileStatistics DIRECTORY_STAT(0, 0, true);
103 // Some environments exhibit unreliable DNS resolution. Set this environment
104 // variable to a positive integer describing the frequency used to refresh the
105 // userspace DNS cache.
106 constexpr char kResolveCacheSecs[] = "GCS_RESOLVE_REFRESH_SECS";
107 // The environment variable to configure the http request's connection timeout.
108 constexpr char kRequestConnectionTimeout[] =
109 "GCS_REQUEST_CONNECTION_TIMEOUT_SECS";
110 // The environment variable to configure the http request's idle timeout.
111 constexpr char kRequestIdleTimeout[] = "GCS_REQUEST_IDLE_TIMEOUT_SECS";
112 // The environment variable to configure the overall request timeout for
113 // metadata requests.
114 constexpr char kMetadataRequestTimeout[] = "GCS_METADATA_REQUEST_TIMEOUT_SECS";
115 // The environment variable to configure the overall request timeout for
116 // block reads requests.
117 constexpr char kReadRequestTimeout[] = "GCS_READ_REQUEST_TIMEOUT_SECS";
118 // The environment variable to configure the overall request timeout for
119 // upload requests.
120 constexpr char kWriteRequestTimeout[] = "GCS_WRITE_REQUEST_TIMEOUT_SECS";
121 // The environment variable to configure an additional header to send with
122 // all requests to GCS (format HEADERNAME:HEADERCONTENT)
123 constexpr char kAdditionalRequestHeader[] = "GCS_ADDITIONAL_REQUEST_HEADER";
124 // The environment variable to configure the throttle (format: <int64_t>)
125 constexpr char kThrottleRate[] = "GCS_THROTTLE_TOKEN_RATE";
126 // The environment variable to configure the token bucket size (format:
127 // <int64_t>)
128 constexpr char kThrottleBucket[] = "GCS_THROTTLE_BUCKET_SIZE";
129 // The environment variable that controls the number of tokens per request.
130 // (format: <int64_t>)
131 constexpr char kTokensPerRequest[] = "GCS_TOKENS_PER_REQUEST";
132 // The environment variable to configure the initial tokens (format: <int64_t>)
133 constexpr char kInitialTokens[] = "GCS_INITIAL_TOKENS";
134
135 // The environment variable to customize which GCS bucket locations are allowed,
136 // if the list is empty defaults to using the region of the zone (format, comma
137 // delimited list). Requires 'storage.buckets.get' permission.
138 constexpr char kAllowedBucketLocations[] = "GCS_ALLOWED_BUCKET_LOCATIONS";
139 // When this value is passed as an allowed location detects the zone tensorflow
140 // is running in and restricts to buckets in that region.
141 constexpr char kDetectZoneSentinelValue[] = "auto";
142
143 // How to upload new data when Flush() is called multiple times.
144 // By default the entire file is reuploaded.
145 constexpr char kAppendMode[] = "GCS_APPEND_MODE";
146 // If GCS_APPEND_MODE=compose then instead the new data is uploaded to a
147 // temporary object and composed with the original object. This is disabled by
148 // default as the multiple API calls required add a risk of stranding temporary
149 // objects.
150 constexpr char kComposeAppend[] = "compose";
151
GetTmpFilename(string * filename)152 Status GetTmpFilename(string* filename) {
153 *filename = io::GetTempFilename("");
154 return OkStatus();
155 }
156
157 /// Appends a trailing slash if the name doesn't already have one.
MaybeAppendSlash(const string & name)158 string MaybeAppendSlash(const string& name) {
159 if (name.empty()) {
160 return "/";
161 }
162 if (name.back() != '/') {
163 return strings::StrCat(name, "/");
164 }
165 return name;
166 }
167
168 // io::JoinPath() doesn't work in cases when we want an empty subpath
169 // to result in an appended slash in order for directory markers
170 // to be processed correctly: "gs://a/b" + "" should give "gs://a/b/".
JoinGcsPath(const string & path,const string & subpath)171 string JoinGcsPath(const string& path, const string& subpath) {
172 return strings::StrCat(MaybeAppendSlash(path), subpath);
173 }
174
175 /// \brief Returns the given paths appending all their subfolders.
176 ///
177 /// For every path X in the list, every subfolder in X is added to the
178 /// resulting list.
179 /// For example:
180 /// - for 'a/b/c/d' it will append 'a', 'a/b' and 'a/b/c'
181 /// - for 'a/b/c/' it will append 'a', 'a/b' and 'a/b/c'
182 /// - for 'a//b/c/' it will append 'a', 'a//b' and 'a//b/c'
183 /// - for '/a/b/c/' it will append '/a', '/a/b' and '/a/b/c'
AddAllSubpaths(const std::vector<string> & paths)184 std::set<string> AddAllSubpaths(const std::vector<string>& paths) {
185 std::set<string> result;
186 result.insert(paths.begin(), paths.end());
187 for (const string& path : paths) {
188 StringPiece subpath = io::Dirname(path);
189 // If `path` starts with `/`, `subpath` will be `/` and then we get into an
190 // infinite loop. Same behavior happens if there is a `//` pattern in
191 // `path`, so we check for that and leave the loop quicker.
192 while (!(subpath.empty() || subpath == "/")) {
193 result.emplace(string(subpath));
194 subpath = io::Dirname(subpath);
195 }
196 }
197 return result;
198 }
199
ParseJson(StringPiece json,Json::Value * result)200 Status ParseJson(StringPiece json, Json::Value* result) {
201 Json::Reader reader;
202 if (!reader.parse(json.data(), json.data() + json.size(), *result)) {
203 return errors::Internal("Couldn't parse JSON response from GCS.");
204 }
205 return OkStatus();
206 }
207
ParseJson(const std::vector<char> & json,Json::Value * result)208 Status ParseJson(const std::vector<char>& json, Json::Value* result) {
209 return ParseJson(StringPiece{json.data(), json.size()}, result);
210 }
211
212 /// Reads a JSON value with the given name from a parent JSON value.
GetValue(const Json::Value & parent,const char * name,Json::Value * result)213 Status GetValue(const Json::Value& parent, const char* name,
214 Json::Value* result) {
215 *result = parent.get(name, Json::Value::null);
216 if (result->isNull()) {
217 return errors::Internal("The field '", name,
218 "' was expected in the JSON response.");
219 }
220 return OkStatus();
221 }
222
223 /// Reads a string JSON value with the given name from a parent JSON value.
GetStringValue(const Json::Value & parent,const char * name,string * result)224 Status GetStringValue(const Json::Value& parent, const char* name,
225 string* result) {
226 Json::Value result_value;
227 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
228 if (!result_value.isString()) {
229 return errors::Internal(
230 "The field '", name,
231 "' in the JSON response was expected to be a string.");
232 }
233 *result = result_value.asString();
234 return OkStatus();
235 }
236
237 /// Reads a long JSON value with the given name from a parent JSON value.
GetInt64Value(const Json::Value & parent,const char * name,int64_t * result)238 Status GetInt64Value(const Json::Value& parent, const char* name,
239 int64_t* result) {
240 Json::Value result_value;
241 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
242 if (result_value.isNumeric()) {
243 *result = result_value.asInt64();
244 return OkStatus();
245 }
246 if (result_value.isString() &&
247 strings::safe_strto64(result_value.asCString(), result)) {
248 return OkStatus();
249 }
250 return errors::Internal(
251 "The field '", name,
252 "' in the JSON response was expected to be a number.");
253 }
254
255 /// Reads a boolean JSON value with the given name from a parent JSON value.
GetBoolValue(const Json::Value & parent,const char * name,bool * result)256 Status GetBoolValue(const Json::Value& parent, const char* name, bool* result) {
257 Json::Value result_value;
258 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
259 if (!result_value.isBool()) {
260 return errors::Internal(
261 "The field '", name,
262 "' in the JSON response was expected to be a boolean.");
263 }
264 *result = result_value.asBool();
265 return OkStatus();
266 }
267
268 /// A GCS-based implementation of a random access file with an LRU block cache.
269 class GcsRandomAccessFile : public RandomAccessFile {
270 public:
271 using ReadFn =
272 std::function<Status(const string& filename, uint64 offset, size_t n,
273 StringPiece* result, char* scratch)>;
274
GcsRandomAccessFile(const string & filename,ReadFn read_fn)275 GcsRandomAccessFile(const string& filename, ReadFn read_fn)
276 : filename_(filename), read_fn_(std::move(read_fn)) {}
277
Name(StringPiece * result) const278 Status Name(StringPiece* result) const override {
279 *result = filename_;
280 return OkStatus();
281 }
282
283 /// The implementation of reads with an LRU block cache. Thread safe.
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const284 Status Read(uint64 offset, size_t n, StringPiece* result,
285 char* scratch) const override {
286 return read_fn_(filename_, offset, n, result, scratch);
287 }
288
289 private:
290 /// The filename of this file.
291 const string filename_;
292 /// The implementation of the read operation (provided by the GCSFileSystem).
293 const ReadFn read_fn_;
294 };
295
296 /// A GCS-based implementation of a random access file with a read buffer.
297 class BufferedGcsRandomAccessFile : public RandomAccessFile {
298 public:
299 using ReadFn =
300 std::function<Status(const string& filename, uint64 offset, size_t n,
301 StringPiece* result, char* scratch)>;
302
303 // Initialize the reader. Provided read_fn should be thread safe.
BufferedGcsRandomAccessFile(const string & filename,uint64 buffer_size,ReadFn read_fn)304 BufferedGcsRandomAccessFile(const string& filename, uint64 buffer_size,
305 ReadFn read_fn)
306 : filename_(filename),
307 read_fn_(std::move(read_fn)),
308 buffer_size_(buffer_size),
309 buffer_start_(0),
310 buffer_end_is_past_eof_(false) {}
311
Name(StringPiece * result) const312 Status Name(StringPiece* result) const override {
313 *result = filename_;
314 return OkStatus();
315 }
316
317 /// The implementation of reads with an read buffer. Thread safe.
318 /// Returns `OUT_OF_RANGE` if fewer than n bytes were stored in `*result`
319 /// because of EOF.
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const320 Status Read(uint64 offset, size_t n, StringPiece* result,
321 char* scratch) const override {
322 if (n > buffer_size_) {
323 return read_fn_(filename_, offset, n, result, scratch);
324 }
325 {
326 mutex_lock l(buffer_mutex_);
327 size_t buffer_end = buffer_start_ + buffer_.size();
328 size_t copy_size = 0;
329 if (offset < buffer_end && offset >= buffer_start_) {
330 copy_size = std::min(n, static_cast<size_t>(buffer_end - offset));
331 memcpy(scratch, buffer_.data() + (offset - buffer_start_), copy_size);
332 *result = StringPiece(scratch, copy_size);
333 }
334 bool consumed_buffer_to_eof =
335 offset + copy_size >= buffer_end && buffer_end_is_past_eof_;
336 if (copy_size < n && !consumed_buffer_to_eof) {
337 Status status = FillBuffer(offset + copy_size);
338 if (!status.ok() && !errors::IsOutOfRange(status)) {
339 // Empty the buffer to avoid caching bad reads.
340 buffer_.resize(0);
341 return status;
342 }
343 size_t remaining_copy = std::min(n - copy_size, buffer_.size());
344 memcpy(scratch + copy_size, buffer_.data(), remaining_copy);
345 copy_size += remaining_copy;
346 *result = StringPiece(scratch, copy_size);
347 }
348 if (copy_size < n) {
349 // Forget the end-of-file flag to allow for clients that poll on the
350 // same file.
351 buffer_end_is_past_eof_ = false;
352 return errors::OutOfRange("EOF reached. Requested to read ", n,
353 " bytes from ", offset, ".");
354 }
355 }
356 return OkStatus();
357 }
358
359 private:
FillBuffer(uint64 start) const360 Status FillBuffer(uint64 start) const
361 TF_EXCLUSIVE_LOCKS_REQUIRED(buffer_mutex_) {
362 buffer_start_ = start;
363 buffer_.resize(buffer_size_);
364 StringPiece str_piece;
365 Status status = read_fn_(filename_, buffer_start_, buffer_size_, &str_piece,
366 &(buffer_[0]));
367 buffer_end_is_past_eof_ = errors::IsOutOfRange(status);
368 buffer_.resize(str_piece.size());
369 return status;
370 }
371
372 // The filename of this file.
373 const string filename_;
374
375 // The implementation of the read operation (provided by the GCSFileSystem).
376 const ReadFn read_fn_;
377
378 // Size of buffer that we read from GCS each time we send a request.
379 const uint64 buffer_size_;
380
381 // Mutex for buffering operations that can be accessed from multiple threads.
382 // The following members are mutable in order to provide a const Read.
383 mutable mutex buffer_mutex_;
384
385 // Offset of buffer from start of the file.
386 mutable uint64 buffer_start_ TF_GUARDED_BY(buffer_mutex_);
387
388 mutable bool buffer_end_is_past_eof_ TF_GUARDED_BY(buffer_mutex_);
389
390 mutable string buffer_ TF_GUARDED_BY(buffer_mutex_);
391 };
392
393 // Function object declaration with params needed to create upload sessions.
394 typedef std::function<Status(
395 uint64 start_offset, const std::string& object_to_upload,
396 const std::string& bucket, uint64 file_size, const std::string& gcs_path,
397 UploadSessionHandle* session_handle)>
398 SessionCreator;
399
400 // Function object declaration with params needed to upload objects.
401 typedef std::function<Status(const std::string& session_uri,
402 uint64 start_offset, uint64 already_uploaded,
403 const std::string& tmp_content_filename,
404 uint64 file_size, const std::string& file_path)>
405 ObjectUploader;
406
407 // Function object declaration with params needed to poll upload status.
408 typedef std::function<Status(const string& session_uri, uint64 file_size,
409 const std::string& gcs_path, bool* completed,
410 uint64* uploaded)>
411 StatusPoller;
412
413 // Function object declaration with params needed to poll upload status.
414 typedef std::function<Status(const string& fname, const string& bucket,
415 const string& object, int64_t* generation)>
416 GenerationGetter;
417
418 /// \brief GCS-based implementation of a writeable file.
419 ///
420 /// Since GCS objects are immutable, this implementation writes to a local
421 /// tmp file and copies it to GCS on flush/close.
422 class GcsWritableFile : public WritableFile {
423 public:
GcsWritableFile(const string & bucket,const string & object,GcsFileSystem * filesystem,GcsFileSystem::TimeoutConfig * timeouts,std::function<void ()> file_cache_erase,RetryConfig retry_config,bool compose_append,SessionCreator session_creator,ObjectUploader object_uploader,StatusPoller status_poller,GenerationGetter generation_getter)424 GcsWritableFile(const string& bucket, const string& object,
425 GcsFileSystem* filesystem,
426 GcsFileSystem::TimeoutConfig* timeouts,
427 std::function<void()> file_cache_erase,
428 RetryConfig retry_config, bool compose_append,
429 SessionCreator session_creator,
430 ObjectUploader object_uploader, StatusPoller status_poller,
431 GenerationGetter generation_getter)
432 : bucket_(bucket),
433 object_(object),
434 filesystem_(filesystem),
435 timeouts_(timeouts),
436 file_cache_erase_(std::move(file_cache_erase)),
437 sync_needed_(true),
438 retry_config_(retry_config),
439 compose_append_(compose_append),
440 start_offset_(0),
441 session_creator_(std::move(session_creator)),
442 object_uploader_(std::move(object_uploader)),
443 status_poller_(std::move(status_poller)),
444 generation_getter_(std::move(generation_getter)) {
445 // TODO: to make it safer, outfile_ should be constructed from an FD
446 VLOG(3) << "GcsWritableFile: " << GetGcsPath();
447 if (GetTmpFilename(&tmp_content_filename_).ok()) {
448 outfile_.open(tmp_content_filename_,
449 std::ofstream::binary | std::ofstream::app);
450 }
451 }
452
453 /// \brief Constructs the writable file in append mode.
454 ///
455 /// tmp_content_filename should contain a path of an existing temporary file
456 /// with the content to be appended. The class takes ownership of the
457 /// specified tmp file and deletes it on close.
GcsWritableFile(const string & bucket,const string & object,GcsFileSystem * filesystem,const string & tmp_content_filename,GcsFileSystem::TimeoutConfig * timeouts,std::function<void ()> file_cache_erase,RetryConfig retry_config,bool compose_append,SessionCreator session_creator,ObjectUploader object_uploader,StatusPoller status_poller,GenerationGetter generation_getter)458 GcsWritableFile(const string& bucket, const string& object,
459 GcsFileSystem* filesystem, const string& tmp_content_filename,
460 GcsFileSystem::TimeoutConfig* timeouts,
461 std::function<void()> file_cache_erase,
462 RetryConfig retry_config, bool compose_append,
463 SessionCreator session_creator,
464 ObjectUploader object_uploader, StatusPoller status_poller,
465 GenerationGetter generation_getter)
466 : bucket_(bucket),
467 object_(object),
468 filesystem_(filesystem),
469 timeouts_(timeouts),
470 file_cache_erase_(std::move(file_cache_erase)),
471 sync_needed_(true),
472 retry_config_(retry_config),
473 compose_append_(compose_append),
474 start_offset_(0),
475 session_creator_(std::move(session_creator)),
476 object_uploader_(std::move(object_uploader)),
477 status_poller_(std::move(status_poller)),
478 generation_getter_(std::move(generation_getter)) {
479 VLOG(3) << "GcsWritableFile: " << GetGcsPath() << "with existing file "
480 << tmp_content_filename;
481 tmp_content_filename_ = tmp_content_filename;
482 outfile_.open(tmp_content_filename_,
483 std::ofstream::binary | std::ofstream::app);
484 }
485
~GcsWritableFile()486 ~GcsWritableFile() override {
487 Close().IgnoreError();
488 std::remove(tmp_content_filename_.c_str());
489 }
490
Append(StringPiece data)491 Status Append(StringPiece data) override {
492 TF_RETURN_IF_ERROR(CheckWritable());
493 VLOG(3) << "Append: " << GetGcsPath() << " size " << data.length();
494 sync_needed_ = true;
495 outfile_ << data;
496 if (!outfile_.good()) {
497 return errors::Internal(
498 "Could not append to the internal temporary file.");
499 }
500 return OkStatus();
501 }
502
Close()503 Status Close() override {
504 VLOG(3) << "Close:" << GetGcsPath();
505 if (outfile_.is_open()) {
506 Status sync_status = Sync();
507 if (sync_status.ok()) {
508 outfile_.close();
509 }
510 return sync_status;
511 }
512 return OkStatus();
513 }
514
Flush()515 Status Flush() override {
516 VLOG(3) << "Flush:" << GetGcsPath();
517 return Sync();
518 }
519
Name(StringPiece * result) const520 Status Name(StringPiece* result) const override {
521 return errors::Unimplemented("GCSWritableFile does not support Name()");
522 }
523
Sync()524 Status Sync() override {
525 VLOG(3) << "Sync started:" << GetGcsPath();
526 TF_RETURN_IF_ERROR(CheckWritable());
527 if (!sync_needed_) {
528 return OkStatus();
529 }
530 Status status = SyncImpl();
531 VLOG(3) << "Sync finished " << GetGcsPath();
532 if (status.ok()) {
533 sync_needed_ = false;
534 }
535 return status;
536 }
537
Tell(int64_t * position)538 Status Tell(int64_t* position) override {
539 *position = outfile_.tellp();
540 if (*position == -1) {
541 return errors::Internal("tellp on the internal temporary file failed");
542 }
543 return OkStatus();
544 }
545
546 private:
547 /// Copies the current version of the file to GCS.
548 ///
549 /// This SyncImpl() uploads the object to GCS.
550 /// In case of a failure, it resumes failed uploads as recommended by the GCS
551 /// resumable API documentation. When the whole upload needs to be
552 /// restarted, Sync() returns UNAVAILABLE and relies on RetryingFileSystem.
SyncImpl()553 Status SyncImpl() {
554 outfile_.flush();
555 if (!outfile_.good()) {
556 return errors::Internal(
557 "Could not write to the internal temporary file.");
558 }
559 UploadSessionHandle session_handle;
560 uint64 start_offset = 0;
561 string object_to_upload = object_;
562 bool should_compose = false;
563 if (compose_append_) {
564 start_offset = start_offset_;
565 // Only compose if the object has already been uploaded to GCS
566 should_compose = start_offset > 0;
567 if (should_compose) {
568 object_to_upload =
569 strings::StrCat(io::Dirname(object_), "/.tmpcompose/",
570 io::Basename(object_), ".", start_offset_);
571 }
572 }
573 TF_RETURN_IF_ERROR(CreateNewUploadSession(start_offset, object_to_upload,
574 &session_handle));
575 uint64 already_uploaded = 0;
576 bool first_attempt = true;
577 const Status upload_status = RetryingUtils::CallWithRetries(
578 [&first_attempt, &already_uploaded, &session_handle, &start_offset,
579 this]() {
580 if (session_handle.resumable && !first_attempt) {
581 bool completed;
582 TF_RETURN_IF_ERROR(RequestUploadSessionStatus(
583 session_handle.session_uri, &completed, &already_uploaded));
584 LOG(INFO) << "### RequestUploadSessionStatus: completed = "
585 << completed
586 << ", already_uploaded = " << already_uploaded
587 << ", file = " << GetGcsPath();
588 if (completed) {
589 // Erase the file from the file cache on every successful write.
590 file_cache_erase_();
591 // It's unclear why UploadToSession didn't return OK in the
592 // previous attempt, but GCS reports that the file is fully
593 // uploaded, so succeed.
594 return OkStatus();
595 }
596 }
597 first_attempt = false;
598 return UploadToSession(session_handle.session_uri, start_offset,
599 already_uploaded);
600 },
601 retry_config_);
602 if (errors::IsNotFound(upload_status)) {
603 // GCS docs recommend retrying the whole upload. We're relying on the
604 // RetryingFileSystem to retry the Sync() call.
605 return errors::Unavailable(strings::StrCat(
606 "Upload to gs://", bucket_, "/", object_,
607 " failed, caused by: ", upload_status.error_message()));
608 }
609 if (upload_status.ok()) {
610 if (should_compose) {
611 TF_RETURN_IF_ERROR(AppendObject(object_to_upload));
612 }
613 TF_RETURN_IF_ERROR(GetCurrentFileSize(&start_offset_));
614 }
615 return upload_status;
616 }
617
CheckWritable() const618 Status CheckWritable() const {
619 if (!outfile_.is_open()) {
620 return errors::FailedPrecondition(
621 "The internal temporary file is not writable.");
622 }
623 return OkStatus();
624 }
625
GetCurrentFileSize(uint64 * size)626 Status GetCurrentFileSize(uint64* size) {
627 const auto tellp = outfile_.tellp();
628 if (tellp == static_cast<std::streampos>(-1)) {
629 return errors::Internal(
630 "Could not get the size of the internal temporary file.");
631 }
632 *size = tellp;
633 return OkStatus();
634 }
635
636 /// Initiates a new resumable upload session.
CreateNewUploadSession(uint64 start_offset,std::string object_to_upload,UploadSessionHandle * session_handle)637 Status CreateNewUploadSession(uint64 start_offset,
638 std::string object_to_upload,
639 UploadSessionHandle* session_handle) {
640 uint64 file_size;
641 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
642 return session_creator_(start_offset, object_to_upload, bucket_, file_size,
643 GetGcsPath(), session_handle);
644 }
645
646 /// Appends the data of append_object to the original object and deletes
647 /// append_object.
AppendObject(string append_object)648 Status AppendObject(string append_object) {
649 const string append_object_path = GetGcsPathWithObject(append_object);
650 VLOG(3) << "AppendObject: " << append_object_path << " to " << GetGcsPath();
651
652 int64_t generation = 0;
653 TF_RETURN_IF_ERROR(
654 generation_getter_(GetGcsPath(), bucket_, object_, &generation));
655
656 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
657 [&append_object, &generation, this]() {
658 std::unique_ptr<HttpRequest> request;
659 TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
660
661 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket_, "/o/",
662 request->EscapeString(object_),
663 "/compose"));
664
665 const string request_body = strings::StrCat(
666 "{'sourceObjects': [{'name': '", object_,
667 "','objectPrecondition':{'ifGenerationMatch':", generation,
668 "}},{'name': '", append_object, "'}]}");
669 request->SetTimeouts(timeouts_->connect, timeouts_->idle,
670 timeouts_->metadata);
671 request->AddHeader("content-type", "application/json");
672 request->SetPostFromBuffer(request_body.c_str(), request_body.size());
673 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
674 " when composing to ", GetGcsPath());
675 return OkStatus();
676 },
677 retry_config_));
678
679 return RetryingUtils::DeleteWithRetries(
680 [&append_object_path, this]() {
681 return filesystem_->DeleteFile(append_object_path, nullptr);
682 },
683 retry_config_);
684 }
685
686 /// \brief Requests status of a previously initiated upload session.
687 ///
688 /// If the upload has already succeeded, sets 'completed' to true.
689 /// Otherwise sets 'completed' to false and 'uploaded' to the currently
690 /// uploaded size in bytes.
RequestUploadSessionStatus(const string & session_uri,bool * completed,uint64 * uploaded)691 Status RequestUploadSessionStatus(const string& session_uri, bool* completed,
692 uint64* uploaded) {
693 uint64 file_size;
694 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
695 return status_poller_(session_uri, file_size, GetGcsPath(), completed,
696 uploaded);
697 }
698
699 /// Uploads data to object.
UploadToSession(const string & session_uri,uint64 start_offset,uint64 already_uploaded)700 Status UploadToSession(const string& session_uri, uint64 start_offset,
701 uint64 already_uploaded) {
702 uint64 file_size;
703 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
704 Status status =
705 object_uploader_(session_uri, start_offset, already_uploaded,
706 tmp_content_filename_, file_size, GetGcsPath());
707 if (status.ok()) {
708 // Erase the file from the file cache on every successful write.
709 // Note: Only local cache, this does nothing on distributed cache. The
710 // distributed cache clears the cache as it is needed.
711 file_cache_erase_();
712 }
713
714 return status;
715 }
716
GetGcsPathWithObject(string object) const717 string GetGcsPathWithObject(string object) const {
718 return strings::StrCat("gs://", bucket_, "/", object);
719 }
GetGcsPath() const720 string GetGcsPath() const { return GetGcsPathWithObject(object_); }
721
722 string bucket_;
723 string object_;
724 GcsFileSystem* const filesystem_; // Not owned.
725 string tmp_content_filename_;
726 std::ofstream outfile_;
727 GcsFileSystem::TimeoutConfig* timeouts_;
728 std::function<void()> file_cache_erase_;
729 bool sync_needed_; // whether there is buffered data that needs to be synced
730 RetryConfig retry_config_;
731 bool compose_append_;
732 uint64 start_offset_;
733 // Callbacks to the file system used to upload object into GCS.
734 const SessionCreator session_creator_;
735 const ObjectUploader object_uploader_;
736 const StatusPoller status_poller_;
737 const GenerationGetter generation_getter_;
738 };
739
740 class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
741 public:
GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data,uint64 length)742 GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data, uint64 length)
743 : data_(std::move(data)), length_(length) {}
data()744 const void* data() override { return reinterpret_cast<void*>(data_.get()); }
length()745 uint64 length() override { return length_; }
746
747 private:
748 std::unique_ptr<char[]> data_;
749 uint64 length_;
750 };
751
StringPieceIdentity(StringPiece str,StringPiece * value)752 bool StringPieceIdentity(StringPiece str, StringPiece* value) {
753 *value = str;
754 return true;
755 }
756
757 /// \brief Utility function to split a comma delimited list of strings to an
758 /// unordered set, lowercasing all values.
SplitByCommaToLowercaseSet(StringPiece list,std::unordered_set<string> * set)759 bool SplitByCommaToLowercaseSet(StringPiece list,
760 std::unordered_set<string>* set) {
761 std::vector<string> vector = absl::StrSplit(absl::AsciiStrToLower(list), ',');
762 *set = std::unordered_set<string>(vector.begin(), vector.end());
763 return true;
764 }
765
766 // \brief Convert Compute Engine zone to region
ZoneToRegion(string * zone)767 string ZoneToRegion(string* zone) {
768 return zone->substr(0, zone->find_last_of('-'));
769 }
770
771 } // namespace
772
GcsFileSystem(bool make_default_cache)773 GcsFileSystem::GcsFileSystem(bool make_default_cache) {
774 uint64 value;
775 block_size_ = kDefaultBlockSize;
776 size_t max_bytes = kDefaultMaxCacheSize;
777
778 uint64 max_staleness = kDefaultMaxStaleness;
779
780 http_request_factory_ = std::make_shared<CurlHttpRequest::Factory>();
781 compute_engine_metadata_client_ =
782 std::make_shared<ComputeEngineMetadataClient>(http_request_factory_);
783 auth_provider_ = std::unique_ptr<AuthProvider>(
784 new GoogleAuthProvider(compute_engine_metadata_client_));
785 zone_provider_ = std::unique_ptr<ZoneProvider>(
786 new ComputeEngineZoneProvider(compute_engine_metadata_client_));
787
788 // Apply the sys env override for the readahead buffer size if it's provided.
789 if (GetEnvVar(kReadaheadBufferSize, strings::safe_strtou64, &value)) {
790 block_size_ = value;
791 }
792
793 // Apply the overrides for the block size (MB), max bytes (MB), and max
794 // staleness (seconds) if provided.
795 if (GetEnvVar(kBlockSize, strings::safe_strtou64, &value)) {
796 block_size_ = value * 1024 * 1024;
797 }
798
799 if (GetEnvVar(kMaxCacheSize, strings::safe_strtou64, &value)) {
800 max_bytes = value * 1024 * 1024;
801 }
802
803 if (GetEnvVar(kMaxStaleness, strings::safe_strtou64, &value)) {
804 max_staleness = value;
805 }
806 if (!make_default_cache) {
807 max_bytes = 0;
808 }
809 VLOG(1) << "GCS cache max size = " << max_bytes << " ; "
810 << "block size = " << block_size_ << " ; "
811 << "max staleness = " << max_staleness;
812 file_block_cache_ = MakeFileBlockCache(block_size_, max_bytes, max_staleness);
813 // Apply overrides for the stat cache max age and max entries, if provided.
814 uint64 stat_cache_max_age = kStatCacheDefaultMaxAge;
815 size_t stat_cache_max_entries = kStatCacheDefaultMaxEntries;
816 if (GetEnvVar(kStatCacheMaxAge, strings::safe_strtou64, &value)) {
817 stat_cache_max_age = value;
818 }
819 if (GetEnvVar(kStatCacheMaxEntries, strings::safe_strtou64, &value)) {
820 stat_cache_max_entries = value;
821 }
822 stat_cache_.reset(new ExpiringLRUCache<GcsFileStat>(stat_cache_max_age,
823 stat_cache_max_entries));
824 // Apply overrides for the matching paths cache max age and max entries, if
825 // provided.
826 uint64 matching_paths_cache_max_age = kMatchingPathsCacheDefaultMaxAge;
827 size_t matching_paths_cache_max_entries =
828 kMatchingPathsCacheDefaultMaxEntries;
829 if (GetEnvVar(kMatchingPathsCacheMaxAge, strings::safe_strtou64, &value)) {
830 matching_paths_cache_max_age = value;
831 }
832 if (GetEnvVar(kMatchingPathsCacheMaxEntries, strings::safe_strtou64,
833 &value)) {
834 matching_paths_cache_max_entries = value;
835 }
836 matching_paths_cache_.reset(new ExpiringLRUCache<std::vector<string>>(
837 matching_paths_cache_max_age, matching_paths_cache_max_entries));
838
839 bucket_location_cache_.reset(new ExpiringLRUCache<string>(
840 kCacheNeverExpire, kBucketLocationCacheMaxEntries));
841
842 int64_t resolve_frequency_secs;
843 if (GetEnvVar(kResolveCacheSecs, strings::safe_strto64,
844 &resolve_frequency_secs)) {
845 dns_cache_.reset(new GcsDnsCache(resolve_frequency_secs));
846 VLOG(1) << "GCS DNS cache is enabled. " << kResolveCacheSecs << " = "
847 << resolve_frequency_secs;
848 } else {
849 VLOG(1) << "GCS DNS cache is disabled, because " << kResolveCacheSecs
850 << " = 0 (or is not set)";
851 }
852
853 // Get the additional header
854 StringPiece add_header_contents;
855 if (GetEnvVar(kAdditionalRequestHeader, StringPieceIdentity,
856 &add_header_contents)) {
857 size_t split = add_header_contents.find(':', 0);
858
859 if (split != StringPiece::npos) {
860 StringPiece header_name = add_header_contents.substr(0, split);
861 StringPiece header_value = add_header_contents.substr(split + 1);
862
863 if (!header_name.empty() && !header_value.empty()) {
864 additional_header_.reset(new std::pair<const string, const string>(
865 string(header_name), string(header_value)));
866
867 VLOG(1) << "GCS additional header ENABLED. "
868 << "Name: " << additional_header_->first << ", "
869 << "Value: " << additional_header_->second;
870 } else {
871 LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
872 << add_header_contents;
873 }
874 } else {
875 LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
876 << add_header_contents;
877 }
878 } else {
879 VLOG(1) << "GCS additional header DISABLED. No environment variable set.";
880 }
881
882 // Apply the overrides for request timeouts
883 uint32 timeout_value;
884 if (GetEnvVar(kRequestConnectionTimeout, strings::safe_strtou32,
885 &timeout_value)) {
886 timeouts_.connect = timeout_value;
887 }
888 if (GetEnvVar(kRequestIdleTimeout, strings::safe_strtou32, &timeout_value)) {
889 timeouts_.idle = timeout_value;
890 }
891 if (GetEnvVar(kMetadataRequestTimeout, strings::safe_strtou32,
892 &timeout_value)) {
893 timeouts_.metadata = timeout_value;
894 }
895 if (GetEnvVar(kReadRequestTimeout, strings::safe_strtou32, &timeout_value)) {
896 timeouts_.read = timeout_value;
897 }
898 if (GetEnvVar(kWriteRequestTimeout, strings::safe_strtou32, &timeout_value)) {
899 timeouts_.write = timeout_value;
900 }
901
902 int64_t token_value;
903 if (GetEnvVar(kThrottleRate, strings::safe_strto64, &token_value)) {
904 GcsThrottleConfig config;
905 config.enabled = true;
906 config.token_rate = token_value;
907
908 if (GetEnvVar(kThrottleBucket, strings::safe_strto64, &token_value)) {
909 config.bucket_size = token_value;
910 }
911
912 if (GetEnvVar(kTokensPerRequest, strings::safe_strto64, &token_value)) {
913 config.tokens_per_request = token_value;
914 }
915
916 if (GetEnvVar(kInitialTokens, strings::safe_strto64, &token_value)) {
917 config.initial_tokens = token_value;
918 }
919 throttle_.SetConfig(config);
920 }
921
922 GetEnvVar(kAllowedBucketLocations, SplitByCommaToLowercaseSet,
923 &allowed_locations_);
924
925 StringPiece append_mode;
926 GetEnvVar(kAppendMode, StringPieceIdentity, &append_mode);
927 if (append_mode == kComposeAppend) {
928 compose_append_ = true;
929 } else {
930 compose_append_ = false;
931 }
932 }
933
GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,std::unique_ptr<HttpRequest::Factory> http_request_factory,std::unique_ptr<ZoneProvider> zone_provider,size_t block_size,size_t max_bytes,uint64 max_staleness,uint64 stat_cache_max_age,size_t stat_cache_max_entries,uint64 matching_paths_cache_max_age,size_t matching_paths_cache_max_entries,RetryConfig retry_config,TimeoutConfig timeouts,const std::unordered_set<string> & allowed_locations,std::pair<const string,const string> * additional_header,bool compose_append)934 GcsFileSystem::GcsFileSystem(
935 std::unique_ptr<AuthProvider> auth_provider,
936 std::unique_ptr<HttpRequest::Factory> http_request_factory,
937 std::unique_ptr<ZoneProvider> zone_provider, size_t block_size,
938 size_t max_bytes, uint64 max_staleness, uint64 stat_cache_max_age,
939 size_t stat_cache_max_entries, uint64 matching_paths_cache_max_age,
940 size_t matching_paths_cache_max_entries, RetryConfig retry_config,
941 TimeoutConfig timeouts, const std::unordered_set<string>& allowed_locations,
942 std::pair<const string, const string>* additional_header,
943 bool compose_append)
944 : timeouts_(timeouts),
945 retry_config_(retry_config),
946 auth_provider_(std::move(auth_provider)),
947 http_request_factory_(std::move(http_request_factory)),
948 zone_provider_(std::move(zone_provider)),
949 block_size_(block_size),
950 file_block_cache_(
951 MakeFileBlockCache(block_size, max_bytes, max_staleness)),
952 stat_cache_(new StatCache(stat_cache_max_age, stat_cache_max_entries)),
953 matching_paths_cache_(new MatchingPathsCache(
954 matching_paths_cache_max_age, matching_paths_cache_max_entries)),
955 bucket_location_cache_(new BucketLocationCache(
956 kCacheNeverExpire, kBucketLocationCacheMaxEntries)),
957 allowed_locations_(allowed_locations),
958 compose_append_(compose_append),
959 additional_header_(additional_header) {}
960
NewRandomAccessFile(const string & fname,TransactionToken * token,std::unique_ptr<RandomAccessFile> * result)961 Status GcsFileSystem::NewRandomAccessFile(
962 const string& fname, TransactionToken* token,
963 std::unique_ptr<RandomAccessFile>* result) {
964 string bucket, object;
965 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
966 TF_RETURN_IF_ERROR(CheckBucketLocationConstraint(bucket));
967 if (cache_enabled_) {
968 result->reset(new GcsRandomAccessFile(fname, [this, bucket, object](
969 const string& fname,
970 uint64 offset, size_t n,
971 StringPiece* result,
972 char* scratch) {
973 tf_shared_lock l(block_cache_lock_);
974 GcsFileStat stat;
975 TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(
976 fname, &stat,
977 [this, bucket, object](const string& fname, GcsFileStat* stat) {
978 return UncachedStatForObject(fname, bucket, object, stat);
979 }));
980 if (!file_block_cache_->ValidateAndUpdateFileSignature(
981 fname, stat.generation_number)) {
982 VLOG(1)
983 << "File signature has been changed. Refreshing the cache. Path: "
984 << fname;
985 }
986 *result = StringPiece();
987 size_t bytes_transferred;
988 TF_RETURN_IF_ERROR(file_block_cache_->Read(fname, offset, n, scratch,
989 &bytes_transferred));
990 *result = StringPiece(scratch, bytes_transferred);
991 if (bytes_transferred < n) {
992 return errors::OutOfRange("EOF reached, ", result->size(),
993 " bytes were read out of ", n,
994 " bytes requested.");
995 }
996 return OkStatus();
997 }));
998 } else {
999 result->reset(new BufferedGcsRandomAccessFile(
1000 fname, block_size_,
1001 [this, bucket, object](const string& fname, uint64 offset, size_t n,
1002 StringPiece* result, char* scratch) {
1003 *result = StringPiece();
1004 size_t bytes_transferred;
1005 TF_RETURN_IF_ERROR(
1006 LoadBufferFromGCS(fname, offset, n, scratch, &bytes_transferred));
1007 *result = StringPiece(scratch, bytes_transferred);
1008 if (bytes_transferred < n) {
1009 return errors::OutOfRange("EOF reached, ", result->size(),
1010 " bytes were read out of ", n,
1011 " bytes requested.");
1012 }
1013 return OkStatus();
1014 }));
1015 }
1016 return OkStatus();
1017 }
1018
ResetFileBlockCache(size_t block_size_bytes,size_t max_bytes,uint64 max_staleness_secs)1019 void GcsFileSystem::ResetFileBlockCache(size_t block_size_bytes,
1020 size_t max_bytes,
1021 uint64 max_staleness_secs) {
1022 mutex_lock l(block_cache_lock_);
1023 file_block_cache_ =
1024 MakeFileBlockCache(block_size_bytes, max_bytes, max_staleness_secs);
1025 if (stats_ != nullptr) {
1026 stats_->Configure(this, &throttle_, file_block_cache_.get());
1027 }
1028 }
1029
1030 // A helper function to build a FileBlockCache for GcsFileSystem.
MakeFileBlockCache(size_t block_size,size_t max_bytes,uint64 max_staleness)1031 std::unique_ptr<FileBlockCache> GcsFileSystem::MakeFileBlockCache(
1032 size_t block_size, size_t max_bytes, uint64 max_staleness) {
1033 std::unique_ptr<FileBlockCache> file_block_cache(new RamFileBlockCache(
1034 block_size, max_bytes, max_staleness,
1035 [this](const string& filename, size_t offset, size_t n, char* buffer,
1036 size_t* bytes_transferred) {
1037 return LoadBufferFromGCS(filename, offset, n, buffer,
1038 bytes_transferred);
1039 }));
1040
1041 // Check if cache is enabled here to avoid unnecessary mutex contention.
1042 cache_enabled_ = file_block_cache->IsCacheEnabled();
1043 return file_block_cache;
1044 }
1045
1046 // A helper function to actually read the data from GCS.
LoadBufferFromGCS(const string & fname,size_t offset,size_t n,char * buffer,size_t * bytes_transferred)1047 Status GcsFileSystem::LoadBufferFromGCS(const string& fname, size_t offset,
1048 size_t n, char* buffer,
1049 size_t* bytes_transferred) {
1050 *bytes_transferred = 0;
1051
1052 string bucket, object;
1053 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1054
1055 profiler::TraceMe activity(
1056 [fname]() { return absl::StrCat("LoadBufferFromGCS ", fname); });
1057
1058 std::unique_ptr<HttpRequest> request;
1059 TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
1060 "when reading gs://", bucket, "/", object);
1061
1062 request->SetUri(strings::StrCat("https://", kStorageHost, "/", bucket, "/",
1063 request->EscapeString(object)));
1064 request->SetRange(offset, offset + n - 1);
1065 request->SetResultBufferDirect(buffer, n);
1066 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.read);
1067
1068 if (stats_ != nullptr) {
1069 stats_->RecordBlockLoadRequest(fname, offset);
1070 }
1071
1072 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://",
1073 bucket, "/", object);
1074
1075 size_t bytes_read = request->GetResultBufferDirectBytesTransferred();
1076 *bytes_transferred = bytes_read;
1077 VLOG(1) << "Successful read of gs://" << bucket << "/" << object << " @ "
1078 << offset << " of size: " << bytes_read;
1079 activity.AppendMetadata([bytes_read]() {
1080 return profiler::TraceMeEncode({{"block_size", bytes_read}});
1081 });
1082
1083 if (stats_ != nullptr) {
1084 stats_->RecordBlockRetrieved(fname, offset, bytes_read);
1085 }
1086
1087 throttle_.RecordResponse(bytes_read);
1088
1089 if (bytes_read < n) {
1090 // Check stat cache to see if we encountered an interrupted read.
1091 GcsFileStat stat;
1092 if (stat_cache_->Lookup(fname, &stat)) {
1093 if (offset + bytes_read < stat.base.length) {
1094 return errors::Internal(strings::Printf(
1095 "File contents are inconsistent for file: %s @ %lu.", fname.c_str(),
1096 offset));
1097 }
1098 VLOG(2) << "Successful integrity check for: gs://" << bucket << "/"
1099 << object << " @ " << offset;
1100 }
1101 }
1102
1103 return OkStatus();
1104 }
1105
1106 /// Initiates a new upload session.
CreateNewUploadSession(uint64 start_offset,const std::string & object_to_upload,const std::string & bucket,uint64 file_size,const std::string & gcs_path,UploadSessionHandle * session_handle)1107 Status GcsFileSystem::CreateNewUploadSession(
1108 uint64 start_offset, const std::string& object_to_upload,
1109 const std::string& bucket, uint64 file_size, const std::string& gcs_path,
1110 UploadSessionHandle* session_handle) {
1111 std::vector<char> output_buffer;
1112 std::unique_ptr<HttpRequest> request;
1113 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1114
1115 std::string uri = strings::StrCat(
1116 kGcsUploadUriBase, "b/", bucket,
1117 "/o?uploadType=resumable&name=", request->EscapeString(object_to_upload));
1118 request->SetUri(uri);
1119 request->AddHeader("X-Upload-Content-Length",
1120 absl::StrCat(file_size - start_offset));
1121 request->SetPostEmptyBody();
1122 request->SetResultBuffer(&output_buffer);
1123 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1124 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
1125 " when initiating an upload to ", gcs_path);
1126 if (session_handle != nullptr) {
1127 session_handle->resumable = true;
1128 session_handle->session_uri = request->GetResponseHeader("Location");
1129 if (session_handle->session_uri.empty()) {
1130 return errors::Internal("Unexpected response from GCS when writing to ",
1131 gcs_path, ": 'Location' header not returned.");
1132 }
1133 }
1134 return OkStatus();
1135 }
1136
UploadToSession(const std::string & session_uri,uint64 start_offset,uint64 already_uploaded,const std::string & tmp_content_filename,uint64 file_size,const std::string & file_path)1137 Status GcsFileSystem::UploadToSession(const std::string& session_uri,
1138 uint64 start_offset,
1139 uint64 already_uploaded,
1140 const std::string& tmp_content_filename,
1141 uint64 file_size,
1142 const std::string& file_path) {
1143 std::unique_ptr<HttpRequest> request;
1144 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1145 request->SetUri(session_uri);
1146 if (file_size > 0) {
1147 request->AddHeader("Content-Range",
1148 strings::StrCat("bytes ", already_uploaded, "-",
1149 file_size - start_offset - 1, "/",
1150 file_size - start_offset));
1151 }
1152 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.write);
1153
1154 TF_RETURN_IF_ERROR(request->SetPutFromFile(tmp_content_filename,
1155 start_offset + already_uploaded));
1156 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ",
1157 file_path);
1158 return OkStatus();
1159 }
1160
RequestUploadSessionStatus(const string & session_uri,uint64 file_size,const std::string & gcs_path,bool * completed,uint64 * uploaded)1161 Status GcsFileSystem::RequestUploadSessionStatus(const string& session_uri,
1162 uint64 file_size,
1163 const std::string& gcs_path,
1164 bool* completed,
1165 uint64* uploaded) {
1166 CHECK(completed != nullptr) << "RequestUploadSessionStatus() called with out "
1167 "param 'completed' == nullptr."; // Crash ok
1168 CHECK(uploaded != nullptr) << "RequestUploadSessionStatus() called with out "
1169 "param 'uploaded' == nullptr."; // Crash ok
1170 std::unique_ptr<HttpRequest> request;
1171 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1172 request->SetUri(session_uri);
1173 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1174 request->AddHeader("Content-Range", strings::StrCat("bytes */", file_size));
1175 request->SetPutEmptyBody();
1176 Status status = request->Send();
1177 if (status.ok()) {
1178 *completed = true;
1179 return OkStatus();
1180 }
1181 *completed = false;
1182 if (request->GetResponseCode() != HTTP_CODE_RESUME_INCOMPLETE) {
1183 TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when resuming upload ", gcs_path);
1184 }
1185 const std::string received_range = request->GetResponseHeader("Range");
1186 if (received_range.empty()) {
1187 // This means GCS doesn't have any bytes of the file yet.
1188 *uploaded = 0;
1189 } else {
1190 StringPiece range_piece(received_range);
1191 absl::ConsumePrefix(&range_piece,
1192 "bytes="); // May or may not be present.
1193
1194 auto return_error = [](const std::string& gcs_path,
1195 const std::string& error_message) {
1196 return errors::Internal("Unexpected response from GCS when writing ",
1197 gcs_path, ": ", error_message);
1198 };
1199
1200 std::vector<string> range_strs = str_util::Split(range_piece, '-');
1201 if (range_strs.size() != 2) {
1202 return return_error(gcs_path, "Range header '" + received_range +
1203 "' could not be parsed.");
1204 }
1205
1206 std::vector<int64_t> range_parts;
1207 for (const std::string& range_str : range_strs) {
1208 int64_t tmp;
1209 if (strings::safe_strto64(range_str, &tmp)) {
1210 range_parts.push_back(tmp);
1211 } else {
1212 return return_error(gcs_path, "Range header '" + received_range +
1213 "' could not be parsed.");
1214 }
1215 }
1216
1217 if (range_parts[0] != 0) {
1218 return return_error(gcs_path, "The returned range '" + received_range +
1219 "' does not start at zero.");
1220 }
1221 // If GCS returned "Range: 0-10", this means 11 bytes were uploaded.
1222 *uploaded = range_parts[1] + 1;
1223 }
1224 return OkStatus();
1225 }
1226
ParseGcsPathForScheme(StringPiece fname,string scheme,bool empty_object_ok,string * bucket,string * object)1227 Status GcsFileSystem::ParseGcsPathForScheme(StringPiece fname, string scheme,
1228 bool empty_object_ok,
1229 string* bucket, string* object) {
1230 StringPiece parsed_scheme, bucketp, objectp;
1231 io::ParseURI(fname, &parsed_scheme, &bucketp, &objectp);
1232 if (parsed_scheme != scheme) {
1233 return errors::InvalidArgument("GCS path doesn't start with 'gs://': ",
1234 fname);
1235 }
1236 *bucket = string(bucketp);
1237 if (bucket->empty() || *bucket == ".") {
1238 return errors::InvalidArgument("GCS path doesn't contain a bucket name: ",
1239 fname);
1240 }
1241 absl::ConsumePrefix(&objectp, "/");
1242 *object = string(objectp);
1243 if (!empty_object_ok && object->empty()) {
1244 return errors::InvalidArgument("GCS path doesn't contain an object name: ",
1245 fname);
1246 }
1247 return OkStatus();
1248 }
1249
ParseGcsPath(StringPiece fname,bool empty_object_ok,string * bucket,string * object)1250 Status GcsFileSystem::ParseGcsPath(StringPiece fname, bool empty_object_ok,
1251 string* bucket, string* object) {
1252 return ParseGcsPathForScheme(fname, "gs", empty_object_ok, bucket, object);
1253 }
1254
ClearFileCaches(const string & fname)1255 void GcsFileSystem::ClearFileCaches(const string& fname) {
1256 tf_shared_lock l(block_cache_lock_);
1257 file_block_cache_->RemoveFile(fname);
1258 stat_cache_->Delete(fname);
1259 // TODO(rxsang): Remove the patterns that matche the file in
1260 // MatchingPathsCache as well.
1261 }
1262
NewWritableFile(const string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)1263 Status GcsFileSystem::NewWritableFile(const string& fname,
1264 TransactionToken* token,
1265 std::unique_ptr<WritableFile>* result) {
1266 string bucket, object;
1267 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1268
1269 auto session_creator =
1270 [this](uint64 start_offset, const std::string& object_to_upload,
1271 const std::string& bucket, uint64 file_size,
1272 const std::string& gcs_path, UploadSessionHandle* session_handle) {
1273 return CreateNewUploadSession(start_offset, object_to_upload, bucket,
1274 file_size, gcs_path, session_handle);
1275 };
1276 auto object_uploader =
1277 [this](const std::string& session_uri, uint64 start_offset,
1278 uint64 already_uploaded, const std::string& tmp_content_filename,
1279 uint64 file_size, const std::string& file_path) {
1280 return UploadToSession(session_uri, start_offset, already_uploaded,
1281 tmp_content_filename, file_size, file_path);
1282 };
1283 auto status_poller = [this](const string& session_uri, uint64 file_size,
1284 const std::string& gcs_path, bool* completed,
1285 uint64* uploaded) {
1286 return RequestUploadSessionStatus(session_uri, file_size, gcs_path,
1287 completed, uploaded);
1288 };
1289
1290 auto generation_getter = [this](const string& fname, const string& bucket,
1291 const string& object, int64* generation) {
1292 GcsFileStat stat;
1293 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
1294 [&fname, &bucket, &object, &stat, this]() {
1295 return UncachedStatForObject(fname, bucket, object, &stat);
1296 },
1297 retry_config_));
1298 *generation = stat.generation_number;
1299 return OkStatus();
1300 };
1301
1302 result->reset(new GcsWritableFile(
1303 bucket, object, this, &timeouts_,
1304 [this, fname]() { ClearFileCaches(fname); }, retry_config_,
1305 compose_append_, session_creator, object_uploader, status_poller,
1306 generation_getter));
1307 return OkStatus();
1308 }
1309
1310 // Reads the file from GCS in chunks and stores it in a tmp file,
1311 // which is then passed to GcsWritableFile.
NewAppendableFile(const string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)1312 Status GcsFileSystem::NewAppendableFile(const string& fname,
1313 TransactionToken* token,
1314 std::unique_ptr<WritableFile>* result) {
1315 std::unique_ptr<RandomAccessFile> reader;
1316 TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &reader));
1317 std::unique_ptr<char[]> buffer(new char[kReadAppendableFileBufferSize]);
1318 Status status;
1319 uint64 offset = 0;
1320 StringPiece read_chunk;
1321
1322 // Read the file from GCS in chunks and save it to a tmp file.
1323 string old_content_filename;
1324 TF_RETURN_IF_ERROR(GetTmpFilename(&old_content_filename));
1325 std::ofstream old_content(old_content_filename, std::ofstream::binary);
1326 while (true) {
1327 status = reader->Read(offset, kReadAppendableFileBufferSize, &read_chunk,
1328 buffer.get());
1329 if (status.ok()) {
1330 old_content << read_chunk;
1331 offset += kReadAppendableFileBufferSize;
1332 } else if (status.code() == error::NOT_FOUND) {
1333 // New file, there is no existing content in it.
1334 break;
1335 } else if (status.code() == error::OUT_OF_RANGE) {
1336 // Expected, this means we reached EOF.
1337 old_content << read_chunk;
1338 break;
1339 } else {
1340 return status;
1341 }
1342 }
1343 old_content.close();
1344
1345 auto session_creator =
1346 [this](uint64 start_offset, const std::string& object_to_upload,
1347 const std::string& bucket, uint64 file_size,
1348 const std::string& gcs_path, UploadSessionHandle* session_handle) {
1349 return CreateNewUploadSession(start_offset, object_to_upload, bucket,
1350 file_size, gcs_path, session_handle);
1351 };
1352 auto object_uploader =
1353 [this](const std::string& session_uri, uint64 start_offset,
1354 uint64 already_uploaded, const std::string& tmp_content_filename,
1355 uint64 file_size, const std::string& file_path) {
1356 return UploadToSession(session_uri, start_offset, already_uploaded,
1357 tmp_content_filename, file_size, file_path);
1358 };
1359
1360 auto status_poller = [this](const string& session_uri, uint64 file_size,
1361 const std::string& gcs_path, bool* completed,
1362 uint64* uploaded) {
1363 return RequestUploadSessionStatus(session_uri, file_size, gcs_path,
1364 completed, uploaded);
1365 };
1366
1367 auto generation_getter = [this](const string& fname, const string& bucket,
1368 const string& object, int64* generation) {
1369 GcsFileStat stat;
1370 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
1371 [&fname, &bucket, &object, &stat, this]() {
1372 return UncachedStatForObject(fname, bucket, object, &stat);
1373 },
1374 retry_config_));
1375 *generation = stat.generation_number;
1376 return OkStatus();
1377 };
1378
1379 // Create a writable file and pass the old content to it.
1380 string bucket, object;
1381 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1382 result->reset(new GcsWritableFile(
1383 bucket, object, this, old_content_filename, &timeouts_,
1384 [this, fname]() { ClearFileCaches(fname); }, retry_config_,
1385 compose_append_, session_creator, object_uploader, status_poller,
1386 generation_getter));
1387 return OkStatus();
1388 }
1389
NewReadOnlyMemoryRegionFromFile(const string & fname,TransactionToken * token,std::unique_ptr<ReadOnlyMemoryRegion> * result)1390 Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile(
1391 const string& fname, TransactionToken* token,
1392 std::unique_ptr<ReadOnlyMemoryRegion>* result) {
1393 uint64 size;
1394 TF_RETURN_IF_ERROR(GetFileSize(fname, token, &size));
1395 std::unique_ptr<char[]> data(new char[size]);
1396
1397 std::unique_ptr<RandomAccessFile> file;
1398 TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &file));
1399
1400 StringPiece piece;
1401 TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
1402
1403 result->reset(new GcsReadOnlyMemoryRegion(std::move(data), size));
1404 return OkStatus();
1405 }
1406
FileExists(const string & fname,TransactionToken * token)1407 Status GcsFileSystem::FileExists(const string& fname, TransactionToken* token) {
1408 string bucket, object;
1409 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1410 if (object.empty()) {
1411 bool result;
1412 TF_RETURN_IF_ERROR(BucketExists(bucket, &result));
1413 if (result) {
1414 return OkStatus();
1415 }
1416 }
1417
1418 // Check if the object exists.
1419 GcsFileStat stat;
1420 const Status status = StatForObject(fname, bucket, object, &stat);
1421 if (!errors::IsNotFound(status)) {
1422 return status;
1423 }
1424
1425 // Check if the folder exists.
1426 bool result;
1427 TF_RETURN_IF_ERROR(FolderExists(fname, &result));
1428 if (result) {
1429 return OkStatus();
1430 }
1431 return errors::NotFound("The specified path ", fname, " was not found.");
1432 }
1433
ObjectExists(const string & fname,const string & bucket,const string & object,bool * result)1434 Status GcsFileSystem::ObjectExists(const string& fname, const string& bucket,
1435 const string& object, bool* result) {
1436 GcsFileStat stat;
1437 const Status status = StatForObject(fname, bucket, object, &stat);
1438 switch (static_cast<int>(status.code())) {
1439 case static_cast<int>(error::Code::OK):
1440 *result = !stat.base.is_directory;
1441 return OkStatus();
1442 case static_cast<int>(error::Code::NOT_FOUND):
1443 *result = false;
1444 return OkStatus();
1445 default:
1446 return status;
1447 }
1448 }
1449
UncachedStatForObject(const string & fname,const string & bucket,const string & object,GcsFileStat * stat)1450 Status GcsFileSystem::UncachedStatForObject(const string& fname,
1451 const string& bucket,
1452 const string& object,
1453 GcsFileStat* stat) {
1454 std::vector<char> output_buffer;
1455 std::unique_ptr<HttpRequest> request;
1456 TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
1457 " when reading metadata of gs://", bucket,
1458 "/", object);
1459
1460 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
1461 request->EscapeString(object),
1462 "?fields=size%2Cgeneration%2Cupdated"));
1463 request->SetResultBuffer(&output_buffer);
1464 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1465
1466 if (stats_ != nullptr) {
1467 stats_->RecordStatObjectRequest();
1468 }
1469
1470 TF_RETURN_WITH_CONTEXT_IF_ERROR(
1471 request->Send(), " when reading metadata of gs://", bucket, "/", object);
1472
1473 Json::Value root;
1474 TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1475
1476 // Parse file size.
1477 TF_RETURN_IF_ERROR(GetInt64Value(root, "size", &stat->base.length));
1478
1479 // Parse generation number.
1480 TF_RETURN_IF_ERROR(
1481 GetInt64Value(root, "generation", &stat->generation_number));
1482
1483 // Parse file modification time.
1484 string updated;
1485 TF_RETURN_IF_ERROR(GetStringValue(root, "updated", &updated));
1486 TF_RETURN_IF_ERROR(ParseRfc3339Time(updated, &(stat->base.mtime_nsec)));
1487
1488 VLOG(1) << "Stat of: gs://" << bucket << "/" << object << " -- "
1489 << " length: " << stat->base.length
1490 << " generation: " << stat->generation_number
1491 << "; mtime_nsec: " << stat->base.mtime_nsec
1492 << "; updated: " << updated;
1493
1494 if (str_util::EndsWith(fname, "/")) {
1495 // In GCS a path can be both a directory and a file, both it is uncommon for
1496 // other file systems. To avoid the ambiguity, if a path ends with "/" in
1497 // GCS, we always regard it as a directory mark or a virtual directory.
1498 stat->base.is_directory = true;
1499 } else {
1500 stat->base.is_directory = false;
1501 }
1502 return OkStatus();
1503 }
1504
StatForObject(const string & fname,const string & bucket,const string & object,GcsFileStat * stat)1505 Status GcsFileSystem::StatForObject(const string& fname, const string& bucket,
1506 const string& object, GcsFileStat* stat) {
1507 if (object.empty()) {
1508 return errors::InvalidArgument(strings::Printf(
1509 "'object' must be a non-empty string. (File: %s)", fname.c_str()));
1510 }
1511
1512 TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(
1513 fname, stat,
1514 [this, &bucket, &object](const string& fname, GcsFileStat* stat) {
1515 return UncachedStatForObject(fname, bucket, object, stat);
1516 }));
1517 return OkStatus();
1518 }
1519
BucketExists(const string & bucket,bool * result)1520 Status GcsFileSystem::BucketExists(const string& bucket, bool* result) {
1521 const Status status = GetBucketMetadata(bucket, nullptr);
1522 switch (status.code()) {
1523 case errors::Code::OK:
1524 *result = true;
1525 return OkStatus();
1526 case errors::Code::NOT_FOUND:
1527 *result = false;
1528 return OkStatus();
1529 default:
1530 return status;
1531 }
1532 }
1533
CheckBucketLocationConstraint(const string & bucket)1534 Status GcsFileSystem::CheckBucketLocationConstraint(const string& bucket) {
1535 if (allowed_locations_.empty()) {
1536 return OkStatus();
1537 }
1538
1539 // Avoid calling external API's in the constructor
1540 if (allowed_locations_.erase(kDetectZoneSentinelValue) == 1) {
1541 string zone;
1542 TF_RETURN_IF_ERROR(zone_provider_->GetZone(&zone));
1543 allowed_locations_.insert(ZoneToRegion(&zone));
1544 }
1545
1546 string location;
1547 TF_RETURN_IF_ERROR(GetBucketLocation(bucket, &location));
1548 if (allowed_locations_.find(location) != allowed_locations_.end()) {
1549 return OkStatus();
1550 }
1551
1552 return errors::FailedPrecondition(strings::Printf(
1553 "Bucket '%s' is in '%s' location, allowed locations are: (%s).",
1554 bucket.c_str(), location.c_str(),
1555 absl::StrJoin(allowed_locations_, ", ").c_str()));
1556 }
1557
GetBucketLocation(const string & bucket,string * location)1558 Status GcsFileSystem::GetBucketLocation(const string& bucket,
1559 string* location) {
1560 auto compute_func = [this](const string& bucket, string* location) {
1561 std::vector<char> result_buffer;
1562 Status status = GetBucketMetadata(bucket, &result_buffer);
1563 Json::Value result;
1564 TF_RETURN_IF_ERROR(ParseJson(result_buffer, &result));
1565 string bucket_location;
1566 TF_RETURN_IF_ERROR(
1567 GetStringValue(result, kBucketMetadataLocationKey, &bucket_location));
1568 // Lowercase the GCS location to be case insensitive for allowed locations.
1569 *location = absl::AsciiStrToLower(bucket_location);
1570 return OkStatus();
1571 };
1572
1573 TF_RETURN_IF_ERROR(
1574 bucket_location_cache_->LookupOrCompute(bucket, location, compute_func));
1575
1576 return OkStatus();
1577 }
1578
GetBucketMetadata(const string & bucket,std::vector<char> * result_buffer)1579 Status GcsFileSystem::GetBucketMetadata(const string& bucket,
1580 std::vector<char>* result_buffer) {
1581 std::unique_ptr<HttpRequest> request;
1582 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1583 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket));
1584
1585 if (result_buffer != nullptr) {
1586 request->SetResultBuffer(result_buffer);
1587 }
1588
1589 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1590 return request->Send();
1591 }
1592
FolderExists(const string & dirname,bool * result)1593 Status GcsFileSystem::FolderExists(const string& dirname, bool* result) {
1594 StatCache::ComputeFunc compute_func = [this](const string& dirname,
1595 GcsFileStat* stat) {
1596 std::vector<string> children;
1597 TF_RETURN_IF_ERROR(
1598 GetChildrenBounded(dirname, 1, &children, true /* recursively */,
1599 true /* include_self_directory_marker */));
1600 if (!children.empty()) {
1601 stat->base = DIRECTORY_STAT;
1602 return OkStatus();
1603 } else {
1604 return errors::InvalidArgument("Not a directory!");
1605 }
1606 };
1607 GcsFileStat stat;
1608 Status s = stat_cache_->LookupOrCompute(MaybeAppendSlash(dirname), &stat,
1609 compute_func);
1610 if (s.ok()) {
1611 *result = stat.base.is_directory;
1612 return OkStatus();
1613 }
1614 if (errors::IsInvalidArgument(s)) {
1615 *result = false;
1616 return OkStatus();
1617 }
1618 return s;
1619 }
1620
GetChildren(const string & dirname,TransactionToken * token,std::vector<string> * result)1621 Status GcsFileSystem::GetChildren(const string& dirname,
1622 TransactionToken* token,
1623 std::vector<string>* result) {
1624 return GetChildrenBounded(dirname, UINT64_MAX, result,
1625 false /* recursively */,
1626 false /* include_self_directory_marker */);
1627 }
1628
GetMatchingPaths(const string & pattern,TransactionToken * token,std::vector<string> * results)1629 Status GcsFileSystem::GetMatchingPaths(const string& pattern,
1630 TransactionToken* token,
1631 std::vector<string>* results) {
1632 MatchingPathsCache::ComputeFunc compute_func =
1633 [this](const string& pattern, std::vector<string>* results) {
1634 results->clear();
1635 // Find the fixed prefix by looking for the first wildcard.
1636 const string& fixed_prefix =
1637 pattern.substr(0, pattern.find_first_of("*?[\\"));
1638 const string dir(this->Dirname(fixed_prefix));
1639 if (dir.empty()) {
1640 return errors::InvalidArgument(
1641 "A GCS pattern doesn't have a bucket name: ", pattern);
1642 }
1643 std::vector<string> all_files;
1644 TF_RETURN_IF_ERROR(GetChildrenBounded(
1645 dir, UINT64_MAX, &all_files, true /* recursively */,
1646 false /* include_self_directory_marker */));
1647
1648 const auto& files_and_folders = AddAllSubpaths(all_files);
1649
1650 // To handle `/` in the object names, we need to remove it from `dir`
1651 // and then use `StrCat` to insert it back.
1652 const StringPiece dir_no_slash = str_util::StripSuffix(dir, "/");
1653
1654 // Match all obtained paths to the input pattern.
1655 for (const auto& path : files_and_folders) {
1656 // Manually construct the path instead of using `JoinPath` for the
1657 // cases where `path` starts with a `/` (which is a valid character in
1658 // the filenames of GCS objects). `JoinPath` canonicalizes the result,
1659 // removing duplicate slashes. We know that `dir_no_slash` does not
1660 // end in `/`, so we are safe inserting the new `/` here as the path
1661 // separator.
1662 const string full_path = strings::StrCat(dir_no_slash, "/", path);
1663 if (this->Match(full_path, pattern)) {
1664 results->push_back(full_path);
1665 }
1666 }
1667 return OkStatus();
1668 };
1669 TF_RETURN_IF_ERROR(
1670 matching_paths_cache_->LookupOrCompute(pattern, results, compute_func));
1671 return OkStatus();
1672 }
1673
GetChildrenBounded(const string & dirname,uint64 max_results,std::vector<string> * result,bool recursive,bool include_self_directory_marker)1674 Status GcsFileSystem::GetChildrenBounded(const string& dirname,
1675 uint64 max_results,
1676 std::vector<string>* result,
1677 bool recursive,
1678 bool include_self_directory_marker) {
1679 if (!result) {
1680 return errors::InvalidArgument("'result' cannot be null");
1681 }
1682 string bucket, object_prefix;
1683 TF_RETURN_IF_ERROR(
1684 ParseGcsPath(MaybeAppendSlash(dirname), true, &bucket, &object_prefix));
1685
1686 string nextPageToken;
1687 uint64 retrieved_results = 0;
1688 while (true) { // A loop over multiple result pages.
1689 std::vector<char> output_buffer;
1690 std::unique_ptr<HttpRequest> request;
1691 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1692 auto uri = strings::StrCat(kGcsUriBase, "b/", bucket, "/o");
1693 if (recursive) {
1694 uri = strings::StrCat(uri, "?fields=items%2Fname%2CnextPageToken");
1695 } else {
1696 // Set "/" as a delimiter to ask GCS to treat subfolders as children
1697 // and return them in "prefixes".
1698 uri = strings::StrCat(uri,
1699 "?fields=items%2Fname%2Cprefixes%2CnextPageToken");
1700 uri = strings::StrCat(uri, "&delimiter=%2F");
1701 }
1702 if (!object_prefix.empty()) {
1703 uri = strings::StrCat(uri,
1704 "&prefix=", request->EscapeString(object_prefix));
1705 }
1706 if (!nextPageToken.empty()) {
1707 uri = strings::StrCat(
1708 uri, "&pageToken=", request->EscapeString(nextPageToken));
1709 }
1710 if (max_results - retrieved_results < kGetChildrenDefaultPageSize) {
1711 uri =
1712 strings::StrCat(uri, "&maxResults=", max_results - retrieved_results);
1713 }
1714 request->SetUri(uri);
1715 request->SetResultBuffer(&output_buffer);
1716 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1717
1718 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading ", dirname);
1719 Json::Value root;
1720 TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1721 const auto items = root.get("items", Json::Value::null);
1722 if (!items.isNull()) {
1723 if (!items.isArray()) {
1724 return errors::Internal(
1725 "Expected an array 'items' in the GCS response.");
1726 }
1727 for (size_t i = 0; i < items.size(); i++) {
1728 const auto item = items.get(i, Json::Value::null);
1729 if (!item.isObject()) {
1730 return errors::Internal(
1731 "Unexpected JSON format: 'items' should be a list of objects.");
1732 }
1733 string name;
1734 TF_RETURN_IF_ERROR(GetStringValue(item, "name", &name));
1735 // The names should be relative to the 'dirname'. That means the
1736 // 'object_prefix', which is part of 'dirname', should be removed from
1737 // the beginning of 'name'.
1738 StringPiece relative_path(name);
1739 if (!absl::ConsumePrefix(&relative_path, object_prefix)) {
1740 return errors::Internal(strings::StrCat(
1741 "Unexpected response: the returned file name ", name,
1742 " doesn't match the prefix ", object_prefix));
1743 }
1744 if (!relative_path.empty() || include_self_directory_marker) {
1745 result->emplace_back(relative_path);
1746 }
1747 if (++retrieved_results >= max_results) {
1748 return OkStatus();
1749 }
1750 }
1751 }
1752 const auto prefixes = root.get("prefixes", Json::Value::null);
1753 if (!prefixes.isNull()) {
1754 // Subfolders are returned for the non-recursive mode.
1755 if (!prefixes.isArray()) {
1756 return errors::Internal(
1757 "'prefixes' was expected to be an array in the GCS response.");
1758 }
1759 for (size_t i = 0; i < prefixes.size(); i++) {
1760 const auto prefix = prefixes.get(i, Json::Value::null);
1761 if (prefix.isNull() || !prefix.isString()) {
1762 return errors::Internal(
1763 "'prefixes' was expected to be an array of strings in the GCS "
1764 "response.");
1765 }
1766 const string& prefix_str = prefix.asString();
1767 StringPiece relative_path(prefix_str);
1768 if (!absl::ConsumePrefix(&relative_path, object_prefix)) {
1769 return errors::Internal(
1770 "Unexpected response: the returned folder name ", prefix_str,
1771 " doesn't match the prefix ", object_prefix);
1772 }
1773 result->emplace_back(relative_path);
1774 if (++retrieved_results >= max_results) {
1775 return OkStatus();
1776 }
1777 }
1778 }
1779 const auto token = root.get("nextPageToken", Json::Value::null);
1780 if (token.isNull()) {
1781 return OkStatus();
1782 }
1783 if (!token.isString()) {
1784 return errors::Internal(
1785 "Unexpected response: nextPageToken is not a string");
1786 }
1787 nextPageToken = token.asString();
1788 }
1789 }
1790
Stat(const string & fname,TransactionToken * token,FileStatistics * stat)1791 Status GcsFileSystem::Stat(const string& fname, TransactionToken* token,
1792 FileStatistics* stat) {
1793 if (!stat) {
1794 return errors::Internal("'stat' cannot be nullptr.");
1795 }
1796 string bucket, object;
1797 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1798 if (object.empty()) {
1799 bool is_bucket;
1800 TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1801 if (is_bucket) {
1802 *stat = DIRECTORY_STAT;
1803 return OkStatus();
1804 }
1805 return errors::NotFound("The specified bucket ", fname, " was not found.");
1806 }
1807
1808 GcsFileStat gcs_stat;
1809 const Status status = StatForObject(fname, bucket, object, &gcs_stat);
1810 if (status.ok()) {
1811 *stat = gcs_stat.base;
1812 return OkStatus();
1813 }
1814 if (!errors::IsNotFound(status)) {
1815 return status;
1816 }
1817 bool is_folder;
1818 TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
1819 if (is_folder) {
1820 *stat = DIRECTORY_STAT;
1821 return OkStatus();
1822 }
1823 return errors::NotFound("The specified path ", fname, " was not found.");
1824 }
1825
DeleteFile(const string & fname,TransactionToken * token)1826 Status GcsFileSystem::DeleteFile(const string& fname, TransactionToken* token) {
1827 string bucket, object;
1828 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1829
1830 std::unique_ptr<HttpRequest> request;
1831 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1832 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
1833 request->EscapeString(object)));
1834 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1835 request->SetDeleteRequest();
1836
1837 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when deleting ", fname);
1838 ClearFileCaches(fname);
1839 return OkStatus();
1840 }
1841
CreateDir(const string & dirname,TransactionToken * token)1842 Status GcsFileSystem::CreateDir(const string& dirname,
1843 TransactionToken* token) {
1844 string dirname_with_slash = MaybeAppendSlash(dirname);
1845 VLOG(3) << "CreateDir: creating directory with dirname: " << dirname
1846 << " and dirname_with_slash: " << dirname_with_slash;
1847 string bucket, object;
1848 TF_RETURN_IF_ERROR(ParseGcsPath(dirname_with_slash, /*empty_object_ok=*/true,
1849 &bucket, &object));
1850 if (object.empty()) {
1851 bool is_bucket;
1852 TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1853 return is_bucket ? OkStatus()
1854 : errors::NotFound("The specified bucket ",
1855 dirname_with_slash, " was not found.");
1856 }
1857
1858 if (FileExists(dirname_with_slash, token).ok()) {
1859 // Use the original name for a correct error here.
1860 VLOG(3) << "CreateDir: directory already exists, not uploading " << dirname;
1861 return errors::AlreadyExists(dirname);
1862 }
1863
1864 std::unique_ptr<HttpRequest> request;
1865 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1866
1867 request->SetUri(strings::StrCat(
1868 kGcsUploadUriBase, "b/", bucket,
1869 "/o?uploadType=media&name=", request->EscapeString(object),
1870 // Adding this parameter means HTTP_CODE_PRECONDITION_FAILED
1871 // will be returned if the object already exists, so avoid reuploading.
1872 "&ifGenerationMatch=0"));
1873
1874 request->SetPostEmptyBody();
1875 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1876 const Status& status = request->Send();
1877 if (status.ok()) {
1878 VLOG(3) << "CreateDir: finished uploading directory " << dirname;
1879 return OkStatus();
1880 }
1881 if (request->GetResponseCode() != HTTP_CODE_PRECONDITION_FAILED) {
1882 TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when uploading ",
1883 dirname_with_slash);
1884 }
1885 VLOG(3) << "Ignoring directory already exists on object "
1886 << dirname_with_slash;
1887 return errors::AlreadyExists(dirname);
1888 }
1889
1890 // Checks that the directory is empty (i.e no objects with this prefix exist).
1891 // Deletes the GCS directory marker if it exists.
DeleteDir(const string & dirname,TransactionToken * token)1892 Status GcsFileSystem::DeleteDir(const string& dirname,
1893 TransactionToken* token) {
1894 std::vector<string> children;
1895 // A directory is considered empty either if there are no matching objects
1896 // with the corresponding name prefix or if there is exactly one matching
1897 // object and it is the directory marker. Therefore we need to retrieve
1898 // at most two children for the prefix to detect if a directory is empty.
1899 TF_RETURN_IF_ERROR(
1900 GetChildrenBounded(dirname, 2, &children, true /* recursively */,
1901 true /* include_self_directory_marker */));
1902
1903 if (children.size() > 1 || (children.size() == 1 && !children[0].empty())) {
1904 return errors::FailedPrecondition("Cannot delete a non-empty directory.");
1905 }
1906 if (children.size() == 1 && children[0].empty()) {
1907 // This is the directory marker object. Delete it.
1908 return DeleteFile(MaybeAppendSlash(dirname), token);
1909 }
1910 return OkStatus();
1911 }
1912
GetFileSize(const string & fname,TransactionToken * token,uint64 * file_size)1913 Status GcsFileSystem::GetFileSize(const string& fname, TransactionToken* token,
1914 uint64* file_size) {
1915 if (!file_size) {
1916 return errors::Internal("'file_size' cannot be nullptr.");
1917 }
1918
1919 // Only validate the name.
1920 string bucket, object;
1921 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1922
1923 FileStatistics stat;
1924 TF_RETURN_IF_ERROR(Stat(fname, token, &stat));
1925 *file_size = stat.length;
1926 return OkStatus();
1927 }
1928
RenameFile(const string & src,const string & target,TransactionToken * token)1929 Status GcsFileSystem::RenameFile(const string& src, const string& target,
1930 TransactionToken* token) {
1931 if (!IsDirectory(src, token).ok()) {
1932 return RenameObject(src, target);
1933 }
1934 // Rename all individual objects in the directory one by one.
1935 std::vector<string> children;
1936 TF_RETURN_IF_ERROR(
1937 GetChildrenBounded(src, UINT64_MAX, &children, true /* recursively */,
1938 true /* include_self_directory_marker */));
1939 for (const string& subpath : children) {
1940 TF_RETURN_IF_ERROR(
1941 RenameObject(JoinGcsPath(src, subpath), JoinGcsPath(target, subpath)));
1942 }
1943 return OkStatus();
1944 }
1945
1946 // Uses a GCS API command to copy the object and then deletes the old one.
RenameObject(const string & src,const string & target)1947 Status GcsFileSystem::RenameObject(const string& src, const string& target) {
1948 VLOG(3) << "RenameObject: started gs://" << src << " to " << target;
1949 string src_bucket, src_object, target_bucket, target_object;
1950 TF_RETURN_IF_ERROR(ParseGcsPath(src, false, &src_bucket, &src_object));
1951 TF_RETURN_IF_ERROR(
1952 ParseGcsPath(target, false, &target_bucket, &target_object));
1953
1954 std::unique_ptr<HttpRequest> request;
1955 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1956 request->SetUri(strings::StrCat(kGcsUriBase, "b/", src_bucket, "/o/",
1957 request->EscapeString(src_object),
1958 "/rewriteTo/b/", target_bucket, "/o/",
1959 request->EscapeString(target_object)));
1960 request->SetPostEmptyBody();
1961 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1962 std::vector<char> output_buffer;
1963 request->SetResultBuffer(&output_buffer);
1964 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when renaming ", src,
1965 " to ", target);
1966 // Flush the target from the caches. The source will be flushed in the
1967 // DeleteFile call below.
1968 ClearFileCaches(target);
1969 Json::Value root;
1970 TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1971 bool done;
1972 TF_RETURN_IF_ERROR(GetBoolValue(root, "done", &done));
1973 if (!done) {
1974 // If GCS didn't complete rewrite in one call, this means that a large file
1975 // is being copied to a bucket with a different storage class or location,
1976 // which requires multiple rewrite calls.
1977 // TODO(surkov): implement multi-step rewrites.
1978 return errors::Unimplemented(
1979 "Couldn't rename ", src, " to ", target,
1980 ": moving large files between buckets with different "
1981 "locations or storage classes is not supported.");
1982 }
1983
1984 VLOG(3) << "RenameObject: finished from: gs://" << src << " to " << target;
1985 // In case the delete API call failed, but the deletion actually happened
1986 // on the server side, we can't just retry the whole RenameFile operation
1987 // because the source object is already gone.
1988 return RetryingUtils::DeleteWithRetries(
1989 [this, &src]() { return DeleteFile(src, nullptr); }, retry_config_);
1990 }
1991
IsDirectory(const string & fname,TransactionToken * token)1992 Status GcsFileSystem::IsDirectory(const string& fname,
1993 TransactionToken* token) {
1994 string bucket, object;
1995 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1996 if (object.empty()) {
1997 bool is_bucket;
1998 TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1999 if (is_bucket) {
2000 return OkStatus();
2001 }
2002 return errors::NotFound("The specified bucket gs://", bucket,
2003 " was not found.");
2004 }
2005 bool is_folder;
2006 TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
2007 if (is_folder) {
2008 return OkStatus();
2009 }
2010 bool is_object;
2011 TF_RETURN_IF_ERROR(ObjectExists(fname, bucket, object, &is_object));
2012 if (is_object) {
2013 return errors::FailedPrecondition("The specified path ", fname,
2014 " is not a directory.");
2015 }
2016 return errors::NotFound("The specified path ", fname, " was not found.");
2017 }
2018
DeleteRecursively(const string & dirname,TransactionToken * token,int64_t * undeleted_files,int64_t * undeleted_dirs)2019 Status GcsFileSystem::DeleteRecursively(const string& dirname,
2020 TransactionToken* token,
2021 int64_t* undeleted_files,
2022 int64_t* undeleted_dirs) {
2023 if (!undeleted_files || !undeleted_dirs) {
2024 return errors::Internal(
2025 "'undeleted_files' and 'undeleted_dirs' cannot be nullptr.");
2026 }
2027 *undeleted_files = 0;
2028 *undeleted_dirs = 0;
2029 if (!IsDirectory(dirname, token).ok()) {
2030 *undeleted_dirs = 1;
2031 return Status(
2032 error::NOT_FOUND,
2033 strings::StrCat(dirname, " doesn't exist or not a directory."));
2034 }
2035 std::vector<string> all_objects;
2036 // Get all children in the directory recursively.
2037 TF_RETURN_IF_ERROR(GetChildrenBounded(
2038 dirname, UINT64_MAX, &all_objects, true /* recursively */,
2039 true /* include_self_directory_marker */));
2040 for (const string& object : all_objects) {
2041 const string& full_path = JoinGcsPath(dirname, object);
2042 // Delete all objects including directory markers for subfolders.
2043 // Since DeleteRecursively returns OK if individual file deletions fail,
2044 // and therefore RetryingFileSystem won't pay attention to the failures,
2045 // we need to make sure these failures are properly retried.
2046 const auto& delete_file_status = RetryingUtils::DeleteWithRetries(
2047 [this, &full_path, token]() { return DeleteFile(full_path, token); },
2048 retry_config_);
2049 if (!delete_file_status.ok()) {
2050 if (IsDirectory(full_path, token).ok()) {
2051 // The object is a directory marker.
2052 (*undeleted_dirs)++;
2053 } else {
2054 (*undeleted_files)++;
2055 }
2056 }
2057 }
2058 return OkStatus();
2059 }
2060
2061 // Flushes all caches for filesystem metadata and file contents. Useful for
2062 // reclaiming memory once filesystem operations are done (e.g. model is loaded),
2063 // or for resetting the filesystem to a consistent state.
FlushCaches(TransactionToken * token)2064 void GcsFileSystem::FlushCaches(TransactionToken* token) {
2065 tf_shared_lock l(block_cache_lock_);
2066 file_block_cache_->Flush();
2067 stat_cache_->Clear();
2068 matching_paths_cache_->Clear();
2069 bucket_location_cache_->Clear();
2070 }
2071
SetStats(GcsStatsInterface * stats)2072 void GcsFileSystem::SetStats(GcsStatsInterface* stats) {
2073 CHECK(stats_ == nullptr) << "SetStats() has already been called.";
2074 CHECK(stats != nullptr);
2075 mutex_lock l(block_cache_lock_);
2076 stats_ = stats;
2077 stats_->Configure(this, &throttle_, file_block_cache_.get());
2078 }
2079
SetCacheStats(FileBlockCacheStatsInterface * cache_stats)2080 void GcsFileSystem::SetCacheStats(FileBlockCacheStatsInterface* cache_stats) {
2081 tf_shared_lock l(block_cache_lock_);
2082 if (file_block_cache_ == nullptr) {
2083 LOG(ERROR) << "Tried to set cache stats of non-initialized file block "
2084 "cache object. This may result in not exporting the intended "
2085 "monitoring data";
2086 return;
2087 }
2088 file_block_cache_->SetStats(cache_stats);
2089 }
2090
SetAuthProvider(std::unique_ptr<AuthProvider> auth_provider)2091 void GcsFileSystem::SetAuthProvider(
2092 std::unique_ptr<AuthProvider> auth_provider) {
2093 mutex_lock l(mu_);
2094 auth_provider_ = std::move(auth_provider);
2095 }
2096
2097 // Creates an HttpRequest and sets several parameters that are common to all
2098 // requests. All code (in GcsFileSystem) that creates an HttpRequest should
2099 // go through this method, rather than directly using http_request_factory_.
CreateHttpRequest(std::unique_ptr<HttpRequest> * request)2100 Status GcsFileSystem::CreateHttpRequest(std::unique_ptr<HttpRequest>* request) {
2101 std::unique_ptr<HttpRequest> new_request{http_request_factory_->Create()};
2102 if (dns_cache_) {
2103 dns_cache_->AnnotateRequest(new_request.get());
2104 }
2105
2106 string auth_token;
2107 {
2108 tf_shared_lock l(mu_);
2109 TF_RETURN_IF_ERROR(
2110 AuthProvider::GetToken(auth_provider_.get(), &auth_token));
2111 }
2112
2113 new_request->AddAuthBearerHeader(auth_token);
2114
2115 if (additional_header_) {
2116 new_request->AddHeader(additional_header_->first,
2117 additional_header_->second);
2118 }
2119
2120 if (stats_ != nullptr) {
2121 new_request->SetRequestStats(stats_->HttpStats());
2122 }
2123
2124 if (!throttle_.AdmitRequest()) {
2125 return errors::Unavailable("Request throttled");
2126 }
2127
2128 *request = std::move(new_request);
2129 return OkStatus();
2130 }
2131
2132 } // namespace tensorflow
2133
2134 // The TPU_GCS_FS option sets a TPU-on-GCS optimized file system that allows
2135 // TPU pods to function more optimally. When TPU_GCS_FS is enabled then
2136 // gcs_file_system will not be registered as a file system since the
2137 // tpu_gcs_file_system is going to take over its responsibilities. The tpu file
2138 // system is a child of gcs file system with TPU-pod on GCS optimizations.
2139 // This option is set ON/OFF in the GCP TPU tensorflow config.
2140 // Initialize gcs_file_system
2141 REGISTER_LEGACY_FILE_SYSTEM("gs", ::tensorflow::RetryingGcsFileSystem);
2142