1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/c/experimental/filesystem/plugins/gcs/gcs_filesystem.h"
16
17 #include <stdlib.h>
18 #include <string.h>
19
20 #include "absl/strings/numbers.h"
21 #include "absl/strings/str_cat.h"
22 #include "absl/types/variant.h"
23 #include "google/cloud/storage/client.h"
24 #include "tensorflow/c/env.h"
25 #include "tensorflow/c/experimental/filesystem/plugins/gcs/gcs_helper.h"
26 #include "tensorflow/c/logging.h"
27 #include "tensorflow/c/tf_status.h"
28
29 // Implementation of a filesystem for GCS environments.
30 // This filesystem will support `gs://` URI schemes.
31 namespace gcs = google::cloud::storage;
32
33 // The environment variable that overrides the block size for aligned reads from
34 // GCS. Specified in MB (e.g. "16" = 16 x 1024 x 1024 = 16777216 bytes).
35 constexpr char kBlockSize[] = "GCS_READ_CACHE_BLOCK_SIZE_MB";
36 constexpr size_t kDefaultBlockSize = 64 * 1024 * 1024;
37 // The environment variable that overrides the max size of the LRU cache of
38 // blocks read from GCS. Specified in MB.
39 constexpr char kMaxCacheSize[] = "GCS_READ_CACHE_MAX_SIZE_MB";
40 constexpr size_t kDefaultMaxCacheSize = 0;
41 // The environment variable that overrides the maximum staleness of cached file
42 // contents. Once any block of a file reaches this staleness, all cached blocks
43 // will be evicted on the next read.
44 constexpr char kMaxStaleness[] = "GCS_READ_CACHE_MAX_STALENESS";
45 constexpr uint64_t kDefaultMaxStaleness = 0;
46
47 constexpr char kStatCacheMaxAge[] = "GCS_STAT_CACHE_MAX_AGE";
48 constexpr uint64_t kStatCacheDefaultMaxAge = 5;
49 // The environment variable that overrides the maximum number of entries in the
50 // Stat cache.
51 constexpr char kStatCacheMaxEntries[] = "GCS_STAT_CACHE_MAX_ENTRIES";
52 constexpr size_t kStatCacheDefaultMaxEntries = 1024;
53
54 // How to upload new data when Flush() is called multiple times.
55 // By default the entire file is reuploaded.
56 constexpr char kAppendMode[] = "GCS_APPEND_MODE";
57 // If GCS_APPEND_MODE=compose then instead the new data is uploaded to a
58 // temporary object and composed with the original object. This is disabled by
59 // default as the multiple API calls required add a risk of stranding temporary
60 // objects.
61 constexpr char kComposeAppend[] = "compose";
62
63 // We can cast `google::cloud::StatusCode` to `TF_Code` because they have the
64 // same integer values. See
65 // https://github.com/googleapis/google-cloud-cpp/blob/6c09cbfa0160bc046e5509b4dd2ab4b872648b4a/google/cloud/status.h#L32-L52
TF_SetStatusFromGCSStatus(const google::cloud::Status & gcs_status,TF_Status * status)66 static inline void TF_SetStatusFromGCSStatus(
67 const google::cloud::Status& gcs_status, TF_Status* status) {
68 TF_SetStatus(status, static_cast<TF_Code>(gcs_status.code()),
69 gcs_status.message().c_str());
70 }
71
plugin_memory_allocate(size_t size)72 static void* plugin_memory_allocate(size_t size) { return calloc(1, size); }
plugin_memory_free(void * ptr)73 static void plugin_memory_free(void* ptr) { free(ptr); }
74
ParseGCSPath(const std::string & fname,bool object_empty_ok,std::string * bucket,std::string * object,TF_Status * status)75 void ParseGCSPath(const std::string& fname, bool object_empty_ok,
76 std::string* bucket, std::string* object, TF_Status* status) {
77 size_t scheme_end = fname.find("://") + 2;
78 if (fname.substr(0, scheme_end + 1) != "gs://") {
79 TF_SetStatus(status, TF_INVALID_ARGUMENT,
80 "GCS path doesn't start with 'gs://'.");
81 return;
82 }
83
84 size_t bucket_end = fname.find('/', scheme_end + 1);
85 if (bucket_end == std::string::npos) {
86 TF_SetStatus(status, TF_INVALID_ARGUMENT,
87 "GCS path doesn't contain a bucket name.");
88 return;
89 }
90
91 *bucket = fname.substr(scheme_end + 1, bucket_end - scheme_end - 1);
92 *object = fname.substr(bucket_end + 1);
93
94 if (object->empty() && !object_empty_ok) {
95 TF_SetStatus(status, TF_INVALID_ARGUMENT,
96 "GCS path doesn't contain an object name.");
97 }
98 }
99
100 /// Appends a trailing slash if the name doesn't already have one.
MaybeAppendSlash(std::string * name)101 static void MaybeAppendSlash(std::string* name) {
102 if (name->empty())
103 *name = "/";
104 else if (name->back() != '/')
105 name->push_back('/');
106 }
107
108 // A helper function to actually read the data from GCS.
LoadBufferFromGCS(const std::string & path,size_t offset,size_t buffer_size,char * buffer,tf_gcs_filesystem::GCSFile * gcs_file,TF_Status * status)109 static int64_t LoadBufferFromGCS(const std::string& path, size_t offset,
110 size_t buffer_size, char* buffer,
111 tf_gcs_filesystem::GCSFile* gcs_file,
112 TF_Status* status) {
113 std::string bucket, object;
114 ParseGCSPath(path, false, &bucket, &object, status);
115 if (TF_GetCode(status) != TF_OK) return -1;
116 auto stream = gcs_file->gcs_client.ReadObject(
117 bucket, object, gcs::ReadRange(offset, offset + buffer_size));
118 TF_SetStatusFromGCSStatus(stream.status(), status);
119 if ((TF_GetCode(status) != TF_OK) &&
120 (TF_GetCode(status) != TF_OUT_OF_RANGE)) {
121 return -1;
122 }
123 int64_t read;
124 auto content_length = stream.headers().find("content-length");
125 if (content_length == stream.headers().end()) {
126 // When we read a file with offset that is bigger than the actual file size.
127 // GCS will return an empty header (e.g no `content-length` header). In this
128 // case, we will set read to `0` and continue.
129 read = 0;
130 } else if (!absl::SimpleAtoi(content_length->second, &read)) {
131 TF_SetStatus(status, TF_UNKNOWN, "Could not get content-length header");
132 return -1;
133 }
134 // `TF_OUT_OF_RANGE` isn't considered as an error. So we clear it here.
135 TF_SetStatus(status, TF_OK, "");
136 TF_VLog(1, "Successful read of %s @ %u of size: %u", path.c_str(), offset,
137 read);
138 stream.read(buffer, read);
139 read = stream.gcount();
140 if (read < buffer_size) {
141 // Check stat cache to see if we encountered an interrupted read.
142 tf_gcs_filesystem::GcsFileStat stat;
143 if (gcs_file->stat_cache->Lookup(path, &stat)) {
144 if (offset + read < stat.base.length) {
145 TF_SetStatus(status, TF_INTERNAL,
146 absl::StrCat("File contents are inconsistent for file: ",
147 path, " @ ", offset)
148 .c_str());
149 }
150 TF_VLog(2, "Successful integrity check for: %s @ %u", path.c_str(),
151 offset);
152 }
153 }
154 return read;
155 }
156
157 // SECTION 1. Implementation for `TF_RandomAccessFile`
158 // ----------------------------------------------------------------------------
159 namespace tf_random_access_file {
160 using ReadFn =
161 std::function<int64_t(const std::string& path, uint64_t offset, size_t n,
162 char* buffer, TF_Status* status)>;
163 typedef struct GCSFile {
164 const std::string path;
165 const bool is_cache_enable;
166 const uint64_t buffer_size;
167 ReadFn read_fn;
168 absl::Mutex buffer_mutex;
169 uint64_t buffer_start ABSL_GUARDED_BY(buffer_mutex);
170 bool buffer_end_is_past_eof ABSL_GUARDED_BY(buffer_mutex);
171 std::string buffer ABSL_GUARDED_BY(buffer_mutex);
172
GCSFiletf_random_access_file::GCSFile173 GCSFile(std::string path, bool is_cache_enable, uint64_t buffer_size,
174 ReadFn read_fn)
175 : path(path),
176 is_cache_enable(is_cache_enable),
177 buffer_size(buffer_size),
178 read_fn(std::move(read_fn)),
179 buffer_mutex(),
180 buffer_start(0),
181 buffer_end_is_past_eof(false),
182 buffer() {}
183 } GCSFile;
184
Cleanup(TF_RandomAccessFile * file)185 void Cleanup(TF_RandomAccessFile* file) {
186 auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
187 delete gcs_file;
188 }
189
190 // `google-cloud-cpp` is working on a feature that we may want to use.
191 // See https://github.com/googleapis/google-cloud-cpp/issues/4013.
Read(const TF_RandomAccessFile * file,uint64_t offset,size_t n,char * buffer,TF_Status * status)192 int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
193 char* buffer, TF_Status* status) {
194 auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
195 if (gcs_file->is_cache_enable || n > gcs_file->buffer_size) {
196 return gcs_file->read_fn(gcs_file->path, offset, n, buffer, status);
197 } else {
198 absl::MutexLock l(&gcs_file->buffer_mutex);
199 size_t buffer_end = gcs_file->buffer_start + gcs_file->buffer.size();
200 size_t copy_size = 0;
201 if (offset < buffer_end && gcs_file->buffer_start) {
202 copy_size = (std::min)(n, static_cast<size_t>(buffer_end - offset));
203 memcpy(buffer,
204 gcs_file->buffer.data() + (offset - gcs_file->buffer_start),
205 copy_size);
206 }
207 bool consumed_buffer_to_eof =
208 offset + copy_size >= buffer_end && gcs_file->buffer_end_is_past_eof;
209 if (copy_size < n && !consumed_buffer_to_eof) {
210 gcs_file->buffer_start = offset + copy_size;
211 gcs_file->buffer.resize(gcs_file->buffer_size);
212 auto read_fill_buffer = gcs_file->read_fn(
213 gcs_file->path, gcs_file->buffer_start, gcs_file->buffer_size,
214 &(gcs_file->buffer[0]), status);
215 gcs_file->buffer_end_is_past_eof =
216 (TF_GetCode(status) == TF_OUT_OF_RANGE);
217 if (read_fill_buffer >= 0) gcs_file->buffer.resize(read_fill_buffer);
218 if (TF_GetCode(status) != TF_OK &&
219 TF_GetCode(status) != TF_OUT_OF_RANGE) {
220 // Empty the buffer to avoid caching bad reads.
221 gcs_file->buffer.resize(0);
222 return -1;
223 }
224 size_t remaining_copy =
225 (std::min)(n - copy_size, gcs_file->buffer.size());
226 memcpy(buffer + copy_size, gcs_file->buffer.data(), remaining_copy);
227 copy_size += remaining_copy;
228 }
229 if (copy_size < n) {
230 // Forget the end-of-file flag to allow for clients that poll on the
231 // same file.
232 gcs_file->buffer_end_is_past_eof = false;
233 TF_SetStatus(status, TF_OUT_OF_RANGE, "Read less bytes than requested");
234 return copy_size;
235 }
236 TF_SetStatus(status, TF_OK, "");
237 return copy_size;
238 }
239 }
240
241 } // namespace tf_random_access_file
242
243 // SECTION 2. Implementation for `TF_WritableFile`
244 // ----------------------------------------------------------------------------
245 namespace tf_writable_file {
246 typedef struct GCSFile {
247 const std::string bucket;
248 const std::string object;
249 gcs::Client* gcs_client; // not owned
250 TempFile outfile;
251 bool sync_need;
252 // `offset` tells us how many bytes of this file are already uploaded to
253 // server. If `offset == -1`, we always upload the entire temporary file.
254 int64_t offset;
255 } GCSFile;
256
SyncImpl(const std::string & bucket,const std::string & object,int64_t * offset,TempFile * outfile,gcs::Client * gcs_client,TF_Status * status)257 static void SyncImpl(const std::string& bucket, const std::string& object,
258 int64_t* offset, TempFile* outfile,
259 gcs::Client* gcs_client, TF_Status* status) {
260 outfile->flush();
261 // `*offset == 0` means this file does not exist on the server.
262 if (*offset == -1 || *offset == 0) {
263 // UploadFile will automatically switch to resumable upload based on Client
264 // configuration.
265 auto metadata = gcs_client->UploadFile(outfile->getName(), bucket, object,
266 gcs::Fields("size"));
267 if (!metadata) {
268 TF_SetStatusFromGCSStatus(metadata.status(), status);
269 return;
270 }
271 if (*offset == 0) {
272 if (!outfile->truncate()) {
273 TF_SetStatus(status, TF_INTERNAL,
274 "Could not truncate internal temporary file.");
275 return;
276 }
277 *offset = static_cast<int64_t>(metadata->size());
278 }
279 outfile->clear();
280 outfile->seekp(0, std::ios::end);
281 TF_SetStatus(status, TF_OK, "");
282 } else {
283 std::string temporary_object =
284 gcs::CreateRandomPrefixName("tf_writable_file_gcs");
285 auto metadata = gcs_client->UploadFile(outfile->getName(), bucket,
286 temporary_object, gcs::Fields(""));
287 if (!metadata) {
288 TF_SetStatusFromGCSStatus(metadata.status(), status);
289 return;
290 }
291 TF_VLog(3, "AppendObject: gs://%s/%s to gs://%s/%s", bucket.c_str(),
292 temporary_object.c_str(), bucket.c_str(), object.c_str());
293 const std::vector<gcs::ComposeSourceObject> source_objects = {
294 {object, {}, {}}, {temporary_object, {}, {}}};
295 metadata = gcs_client->ComposeObject(bucket, source_objects, object,
296 gcs::Fields("size"));
297 if (!metadata) {
298 TF_SetStatusFromGCSStatus(metadata.status(), status);
299 return;
300 }
301 // We have to delete the temporary object after composing.
302 auto delete_status = gcs_client->DeleteObject(bucket, temporary_object);
303 if (!delete_status.ok()) {
304 TF_SetStatusFromGCSStatus(delete_status, status);
305 return;
306 }
307 // We truncate the data that are already uploaded.
308 if (!outfile->truncate()) {
309 TF_SetStatus(status, TF_INTERNAL,
310 "Could not truncate internal temporary file.");
311 return;
312 }
313 *offset = static_cast<int64_t>(metadata->size());
314 TF_SetStatus(status, TF_OK, "");
315 }
316 }
317
Cleanup(TF_WritableFile * file)318 void Cleanup(TF_WritableFile* file) {
319 auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
320 delete gcs_file;
321 }
322
Append(const TF_WritableFile * file,const char * buffer,size_t n,TF_Status * status)323 void Append(const TF_WritableFile* file, const char* buffer, size_t n,
324 TF_Status* status) {
325 auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
326 if (!gcs_file->outfile.is_open()) {
327 TF_SetStatus(status, TF_FAILED_PRECONDITION,
328 "The internal temporary file is not writable.");
329 return;
330 }
331 TF_VLog(3, "Append: gs://%s/%s size %u", gcs_file->bucket.c_str(),
332 gcs_file->object.c_str(), n);
333 gcs_file->sync_need = true;
334 gcs_file->outfile.write(buffer, n);
335 if (!gcs_file->outfile)
336 TF_SetStatus(status, TF_INTERNAL,
337 "Could not append to the internal temporary file.");
338 else
339 TF_SetStatus(status, TF_OK, "");
340 }
341
Tell(const TF_WritableFile * file,TF_Status * status)342 int64_t Tell(const TF_WritableFile* file, TF_Status* status) {
343 auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
344 int64_t position = int64_t(gcs_file->outfile.tellp());
345 if (position == -1)
346 TF_SetStatus(status, TF_INTERNAL,
347 "tellp on the internal temporary file failed");
348 else
349 TF_SetStatus(status, TF_OK, "");
350 return position == -1
351 ? -1
352 : position + (gcs_file->offset == -1 ? 0 : gcs_file->offset);
353 }
354
Flush(const TF_WritableFile * file,TF_Status * status)355 void Flush(const TF_WritableFile* file, TF_Status* status) {
356 auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
357 if (gcs_file->sync_need) {
358 TF_VLog(3, "Flush started: gs://%s/%s", gcs_file->bucket.c_str(),
359 gcs_file->object.c_str());
360 if (!gcs_file->outfile) {
361 TF_SetStatus(status, TF_INTERNAL,
362 "Could not append to the internal temporary file.");
363 return;
364 }
365 SyncImpl(gcs_file->bucket, gcs_file->object, &gcs_file->offset,
366 &gcs_file->outfile, gcs_file->gcs_client, status);
367 TF_VLog(3, "Flush finished: gs://%s/%s", gcs_file->bucket.c_str(),
368 gcs_file->object.c_str());
369 if (TF_GetCode(status) != TF_OK) return;
370 gcs_file->sync_need = false;
371 } else {
372 TF_SetStatus(status, TF_OK, "");
373 }
374 }
375
Sync(const TF_WritableFile * file,TF_Status * status)376 void Sync(const TF_WritableFile* file, TF_Status* status) {
377 auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
378 TF_VLog(3, "Sync: gs://%s/%s", gcs_file->bucket.c_str(),
379 gcs_file->object.c_str());
380 Flush(file, status);
381 }
382
Close(const TF_WritableFile * file,TF_Status * status)383 void Close(const TF_WritableFile* file, TF_Status* status) {
384 auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
385 TF_VLog(3, "Close: gs://%s/%s", gcs_file->bucket.c_str(),
386 gcs_file->object.c_str());
387 if (gcs_file->sync_need) {
388 Flush(file, status);
389 }
390 gcs_file->outfile.close();
391 }
392
393 } // namespace tf_writable_file
394
395 // SECTION 3. Implementation for `TF_ReadOnlyMemoryRegion`
396 // ----------------------------------------------------------------------------
397 namespace tf_read_only_memory_region {
398 typedef struct GCSMemoryRegion {
399 const void* const address;
400 const uint64_t length;
401 } GCSMemoryRegion;
402
Cleanup(TF_ReadOnlyMemoryRegion * region)403 void Cleanup(TF_ReadOnlyMemoryRegion* region) {
404 auto r = static_cast<GCSMemoryRegion*>(region->plugin_memory_region);
405 plugin_memory_free(const_cast<void*>(r->address));
406 delete r;
407 }
408
Data(const TF_ReadOnlyMemoryRegion * region)409 const void* Data(const TF_ReadOnlyMemoryRegion* region) {
410 auto r = static_cast<GCSMemoryRegion*>(region->plugin_memory_region);
411 return r->address;
412 }
413
Length(const TF_ReadOnlyMemoryRegion * region)414 uint64_t Length(const TF_ReadOnlyMemoryRegion* region) {
415 auto r = static_cast<GCSMemoryRegion*>(region->plugin_memory_region);
416 return r->length;
417 }
418
419 } // namespace tf_read_only_memory_region
420
421 // SECTION 4. Implementation for `TF_Filesystem`, the actual filesystem
422 // ----------------------------------------------------------------------------
423 namespace tf_gcs_filesystem {
424 // TODO(vnvo2409): Use partial reponse for better performance.
425 // TODO(vnvo2409): We could do some cleanups like `return TF_SetStatus`.
426 // TODO(vnvo2409): Refactor the filesystem implementation when
427 // https://github.com/googleapis/google-cloud-cpp/issues/4482 is done.
GCSFile(google::cloud::storage::Client && gcs_client)428 GCSFile::GCSFile(google::cloud::storage::Client&& gcs_client)
429 : gcs_client(gcs_client), block_cache_lock() {
430 const char* append_mode = std::getenv(kAppendMode);
431 compose = (append_mode != nullptr) && (!strcmp(kAppendMode, append_mode));
432
433 uint64_t value;
434 block_size = kDefaultBlockSize;
435 size_t max_bytes = kDefaultMaxCacheSize;
436 uint64_t max_staleness = kDefaultMaxStaleness;
437
438 // Apply the overrides for the block size (MB), max bytes (MB), and max
439 // staleness (seconds) if provided.
440 const char* block_size_env = std::getenv(kBlockSize);
441 if (block_size_env && absl::SimpleAtoi(block_size_env, &value)) {
442 block_size = value * 1024 * 1024;
443 }
444 const char* max_bytes_env = std::getenv(kMaxCacheSize);
445 if (max_bytes_env && absl::SimpleAtoi(max_bytes_env, &value)) {
446 max_bytes = static_cast<size_t>(value * 1024 * 1024);
447 }
448 const char* max_staleness_env = std::getenv(kMaxStaleness);
449 if (max_staleness_env && absl::SimpleAtoi(max_staleness_env, &value)) {
450 max_staleness = value;
451 }
452 TF_VLog(1, "GCS cache max size = %u ; block size = %u ; max staleness = %u",
453 max_bytes, block_size, max_staleness);
454
455 file_block_cache = std::make_unique<RamFileBlockCache>(
456 block_size, max_bytes, max_staleness,
457 [this](const std::string& filename, size_t offset, size_t buffer_size,
458 char* buffer, TF_Status* status) {
459 return LoadBufferFromGCS(filename, offset, buffer_size, buffer, this,
460 status);
461 });
462
463 uint64_t stat_cache_max_age = kStatCacheDefaultMaxAge;
464 size_t stat_cache_max_entries = kStatCacheDefaultMaxEntries;
465 const char* stat_cache_max_age_env = std::getenv(kStatCacheMaxAge);
466 if (stat_cache_max_age_env &&
467 absl::SimpleAtoi(stat_cache_max_age_env, &value)) {
468 stat_cache_max_age = value;
469 }
470 const char* stat_cache_max_entries_env = std::getenv(kStatCacheMaxEntries);
471 if (stat_cache_max_entries_env &&
472 absl::SimpleAtoi(stat_cache_max_entries_env, &value)) {
473 stat_cache_max_entries = static_cast<size_t>(value);
474 }
475 stat_cache = std::make_unique<ExpiringLRUCache<GcsFileStat>>(
476 stat_cache_max_age, stat_cache_max_entries);
477 }
478
GCSFile(google::cloud::storage::Client && gcs_client,bool compose,uint64_t block_size,size_t max_bytes,uint64_t max_staleness,uint64_t stat_cache_max_age,size_t stat_cache_max_entries)479 GCSFile::GCSFile(google::cloud::storage::Client&& gcs_client, bool compose,
480 uint64_t block_size, size_t max_bytes, uint64_t max_staleness,
481 uint64_t stat_cache_max_age, size_t stat_cache_max_entries)
482 : gcs_client(gcs_client),
483 compose(compose),
484 block_cache_lock(),
485 block_size(block_size) {
486 file_block_cache = std::make_unique<RamFileBlockCache>(
487 block_size, max_bytes, max_staleness,
488 [this](const std::string& filename, size_t offset, size_t buffer_size,
489 char* buffer, TF_Status* status) {
490 return LoadBufferFromGCS(filename, offset, buffer_size, buffer, this,
491 status);
492 });
493 stat_cache = std::make_unique<ExpiringLRUCache<GcsFileStat>>(
494 stat_cache_max_age, stat_cache_max_entries);
495 }
496
InitTest(TF_Filesystem * filesystem,bool compose,uint64_t block_size,size_t max_bytes,uint64_t max_staleness,uint64_t stat_cache_max_age,size_t stat_cache_max_entries,TF_Status * status)497 void InitTest(TF_Filesystem* filesystem, bool compose, uint64_t block_size,
498 size_t max_bytes, uint64_t max_staleness,
499 uint64_t stat_cache_max_age, size_t stat_cache_max_entries,
500 TF_Status* status) {
501 google::cloud::StatusOr<gcs::Client> client =
502 gcs::Client::CreateDefaultClient();
503 if (!client) {
504 TF_SetStatusFromGCSStatus(client.status(), status);
505 return;
506 }
507
508 filesystem->plugin_filesystem =
509 new GCSFile(std::move(client.value()), compose, block_size, max_bytes,
510 max_staleness, stat_cache_max_age, stat_cache_max_entries);
511 TF_SetStatus(status, TF_OK, "");
512 }
513
Init(TF_Filesystem * filesystem,TF_Status * status)514 void Init(TF_Filesystem* filesystem, TF_Status* status) {
515 google::cloud::StatusOr<gcs::Client> client =
516 gcs::Client::CreateDefaultClient();
517 if (!client) {
518 TF_SetStatusFromGCSStatus(client.status(), status);
519 return;
520 }
521
522 filesystem->plugin_filesystem = new GCSFile(std::move(client.value()));
523 TF_SetStatus(status, TF_OK, "");
524 }
525
Cleanup(TF_Filesystem * filesystem)526 void Cleanup(TF_Filesystem* filesystem) {
527 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
528 delete gcs_file;
529 }
530
UncachedStatForObject(const std::string & bucket,const std::string & object,GcsFileStat * stat,gcs::Client * gcs_client,TF_Status * status)531 static void UncachedStatForObject(const std::string& bucket,
532 const std::string& object, GcsFileStat* stat,
533 gcs::Client* gcs_client, TF_Status* status) {
534 auto metadata = gcs_client->GetObjectMetadata(
535 bucket, object, gcs::Fields("generation,size,timeStorageClassUpdated"));
536 if (!metadata) return TF_SetStatusFromGCSStatus(metadata.status(), status);
537 stat->generation_number = metadata->generation();
538 stat->base.length = metadata->size();
539 stat->base.mtime_nsec =
540 metadata->time_storage_class_updated().time_since_epoch().count();
541 stat->base.is_directory = object.back() == '/';
542 TF_VLog(1,
543 "Stat of: gs://%s/%s -- length: %u generation: %u; mtime_nsec: %u;",
544 bucket.c_str(), object.c_str(), stat->base.length,
545 stat->generation_number, stat->base.mtime_nsec);
546 return TF_SetStatus(status, TF_OK, "");
547 }
548
549 // TODO(vnvo2409): Implement later
NewRandomAccessFile(const TF_Filesystem * filesystem,const char * path,TF_RandomAccessFile * file,TF_Status * status)550 void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path,
551 TF_RandomAccessFile* file, TF_Status* status) {
552 std::string bucket, object;
553 ParseGCSPath(path, false, &bucket, &object, status);
554 if (TF_GetCode(status) != TF_OK) return;
555
556 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
557 bool is_cache_enabled;
558 {
559 absl::MutexLock l(&gcs_file->block_cache_lock);
560 is_cache_enabled = gcs_file->file_block_cache->IsCacheEnabled();
561 }
562 auto read_fn = [gcs_file, is_cache_enabled, bucket, object](
563 const std::string& path, uint64_t offset, size_t n,
564 char* buffer, TF_Status* status) -> int64_t {
565 int64_t read = 0;
566 if (is_cache_enabled) {
567 absl::ReaderMutexLock l(&gcs_file->block_cache_lock);
568 GcsFileStat stat;
569 gcs_file->stat_cache->LookupOrCompute(
570 path, &stat,
571 [gcs_file, bucket, object](const std::string& path, GcsFileStat* stat,
572 TF_Status* status) {
573 UncachedStatForObject(bucket, object, stat, &gcs_file->gcs_client,
574 status);
575 },
576 status);
577 if (TF_GetCode(status) != TF_OK) return -1;
578 if (!gcs_file->file_block_cache->ValidateAndUpdateFileSignature(
579 path, stat.generation_number)) {
580 TF_VLog(
581 1,
582 "File signature has been changed. Refreshing the cache. Path: %s",
583 path.c_str());
584 }
585 read = gcs_file->file_block_cache->Read(path, offset, n, buffer, status);
586 } else {
587 read = LoadBufferFromGCS(path, offset, n, buffer, gcs_file, status);
588 }
589 if (TF_GetCode(status) != TF_OK) return -1;
590 if (read < n)
591 TF_SetStatus(status, TF_OUT_OF_RANGE, "Read less bytes than requested");
592 else
593 TF_SetStatus(status, TF_OK, "");
594 return read;
595 };
596 file->plugin_file = new tf_random_access_file::GCSFile(
597 std::move(path), is_cache_enabled, gcs_file->block_size, read_fn);
598 TF_SetStatus(status, TF_OK, "");
599 }
600
NewWritableFile(const TF_Filesystem * filesystem,const char * path,TF_WritableFile * file,TF_Status * status)601 void NewWritableFile(const TF_Filesystem* filesystem, const char* path,
602 TF_WritableFile* file, TF_Status* status) {
603 std::string bucket, object;
604 ParseGCSPath(path, false, &bucket, &object, status);
605 if (TF_GetCode(status) != TF_OK) return;
606
607 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
608 char* temp_file_name = TF_GetTempFileName("");
609 file->plugin_file = new tf_writable_file::GCSFile(
610 {std::move(bucket), std::move(object), &gcs_file->gcs_client,
611 TempFile(temp_file_name, std::ios::binary | std::ios::out), true,
612 (gcs_file->compose ? 0 : -1)});
613 // We are responsible for freeing the pointer returned by TF_GetTempFileName
614 free(temp_file_name);
615 TF_VLog(3, "GcsWritableFile: %s", path);
616 TF_SetStatus(status, TF_OK, "");
617 }
618
NewAppendableFile(const TF_Filesystem * filesystem,const char * path,TF_WritableFile * file,TF_Status * status)619 void NewAppendableFile(const TF_Filesystem* filesystem, const char* path,
620 TF_WritableFile* file, TF_Status* status) {
621 std::string bucket, object;
622 ParseGCSPath(path, false, &bucket, &object, status);
623 if (TF_GetCode(status) != TF_OK) return;
624
625 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
626 char* temp_file_name_c_str = TF_GetTempFileName("");
627 std::string temp_file_name(temp_file_name_c_str); // To prevent memory-leak
628 free(temp_file_name_c_str);
629
630 if (!gcs_file->compose) {
631 auto gcs_status =
632 gcs_file->gcs_client.DownloadToFile(bucket, object, temp_file_name);
633 TF_SetStatusFromGCSStatus(gcs_status, status);
634 auto status_code = TF_GetCode(status);
635 if (status_code != TF_OK && status_code != TF_NOT_FOUND) return;
636 // If this file does not exist on server, we will need to sync it.
637 bool sync_need = (status_code == TF_NOT_FOUND);
638 file->plugin_file = new tf_writable_file::GCSFile(
639 {std::move(bucket), std::move(object), &gcs_file->gcs_client,
640 TempFile(temp_file_name, std::ios::binary | std::ios::app), sync_need,
641 -1});
642 } else {
643 // If compose is true, we do not download anything.
644 // Instead we only check if this file exists on server or not.
645 auto metadata = gcs_file->gcs_client.GetObjectMetadata(bucket, object,
646 gcs::Fields("size"));
647 TF_SetStatusFromGCSStatus(metadata.status(), status);
648 if (TF_GetCode(status) == TF_OK) {
649 file->plugin_file = new tf_writable_file::GCSFile(
650 {std::move(bucket), std::move(object), &gcs_file->gcs_client,
651 TempFile(temp_file_name, std::ios::binary | std::ios::trunc), false,
652 static_cast<int64_t>(metadata->size())});
653 } else if (TF_GetCode(status) == TF_NOT_FOUND) {
654 file->plugin_file = new tf_writable_file::GCSFile(
655 {std::move(bucket), std::move(object), &gcs_file->gcs_client,
656 TempFile(temp_file_name, std::ios::binary | std::ios::trunc), true,
657 0});
658 } else {
659 return;
660 }
661 }
662 TF_VLog(3, "GcsWritableFile: %s with existing file %s", path,
663 temp_file_name.c_str());
664 TF_SetStatus(status, TF_OK, "");
665 }
666
667 // TODO(vnvo2409): We could download into a local temporary file and use
668 // memory-mapping.
NewReadOnlyMemoryRegionFromFile(const TF_Filesystem * filesystem,const char * path,TF_ReadOnlyMemoryRegion * region,TF_Status * status)669 void NewReadOnlyMemoryRegionFromFile(const TF_Filesystem* filesystem,
670 const char* path,
671 TF_ReadOnlyMemoryRegion* region,
672 TF_Status* status) {
673 std::string bucket, object;
674 ParseGCSPath(path, false, &bucket, &object, status);
675 if (TF_GetCode(status) != TF_OK) return;
676
677 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
678 auto metadata = gcs_file->gcs_client.GetObjectMetadata(bucket, object,
679 gcs::Fields("size"));
680 if (!metadata) {
681 TF_SetStatusFromGCSStatus(metadata.status(), status);
682 return;
683 }
684
685 TF_RandomAccessFile reader;
686 NewRandomAccessFile(filesystem, path, &reader, status);
687 if (TF_GetCode(status) != TF_OK) return;
688 char* buffer = static_cast<char*>(plugin_memory_allocate(metadata->size()));
689 int64_t read =
690 tf_random_access_file::Read(&reader, 0, metadata->size(), buffer, status);
691 tf_random_access_file::Cleanup(&reader);
692 if (TF_GetCode(status) != TF_OK) return;
693
694 if (read > 0 && buffer) {
695 region->plugin_memory_region =
696 new tf_read_only_memory_region::GCSMemoryRegion(
697 {buffer, static_cast<uint64_t>(read)});
698 TF_SetStatus(status, TF_OK, "");
699 } else if (read == 0) {
700 TF_SetStatus(status, TF_INVALID_ARGUMENT, "File is empty");
701 }
702 }
703
StatForObject(GCSFile * gcs_file,const std::string & path,const std::string & bucket,const std::string & object,GcsFileStat * stat,TF_Status * status)704 static void StatForObject(GCSFile* gcs_file, const std::string& path,
705 const std::string& bucket, const std::string& object,
706 GcsFileStat* stat, TF_Status* status) {
707 if (object.empty())
708 return TF_SetStatus(
709 status, TF_INVALID_ARGUMENT,
710 absl::StrCat("'object' must be a non-empty string. (File: ", path, ")")
711 .c_str());
712 TF_SetStatus(status, TF_OK, "");
713 gcs_file->stat_cache->LookupOrCompute(
714 path, stat,
715 [gcs_file, bucket, object](const std::string& path, GcsFileStat* stat,
716 TF_Status* status) {
717 UncachedStatForObject(bucket, object, stat, &gcs_file->gcs_client,
718 status);
719 },
720 status);
721 }
722
ObjectExists(GCSFile * gcs_file,const std::string & path,const std::string & bucket,const std::string & object,TF_Status * status)723 static bool ObjectExists(GCSFile* gcs_file, const std::string& path,
724 const std::string& bucket, const std::string& object,
725 TF_Status* status) {
726 GcsFileStat stat;
727 StatForObject(gcs_file, path, bucket, object, &stat, status);
728 if (TF_GetCode(status) != TF_OK && TF_GetCode(status) != TF_NOT_FOUND)
729 return false;
730 if (TF_GetCode(status) == TF_NOT_FOUND) {
731 TF_SetStatus(status, TF_OK, "");
732 return false;
733 }
734 return !stat.base.is_directory;
735 }
736
BucketExists(GCSFile * gcs_file,const std::string & bucket,TF_Status * status)737 static bool BucketExists(GCSFile* gcs_file, const std::string& bucket,
738 TF_Status* status) {
739 auto metadata =
740 gcs_file->gcs_client.GetBucketMetadata(bucket, gcs::Fields(""));
741 TF_SetStatusFromGCSStatus(metadata.status(), status);
742 if (TF_GetCode(status) != TF_OK && TF_GetCode(status) != TF_NOT_FOUND)
743 return false;
744 if (TF_GetCode(status) == TF_NOT_FOUND) {
745 TF_SetStatus(status, TF_OK, "");
746 return false;
747 }
748 return true;
749 }
750
GetChildrenBounded(GCSFile * gcs_file,std::string dir,uint64_t max_results,bool recursive,bool include_self_directory_marker,TF_Status * status)751 static std::vector<std::string> GetChildrenBounded(
752 GCSFile* gcs_file, std::string dir, uint64_t max_results, bool recursive,
753 bool include_self_directory_marker, TF_Status* status) {
754 std::string bucket, prefix;
755 MaybeAppendSlash(&dir);
756 ParseGCSPath(dir, true, &bucket, &prefix, status);
757
758 std::vector<std::string> result;
759 uint64_t count = 0;
760 std::string delimiter = recursive ? "" : "/";
761
762 for (auto&& item : gcs_file->gcs_client.ListObjectsAndPrefixes(
763 bucket, gcs::Prefix(prefix), gcs::Delimiter(delimiter),
764 gcs::Fields("items(name),prefixes"))) {
765 if (count == max_results) {
766 TF_SetStatus(status, TF_OK, "");
767 return result;
768 }
769 if (!item) {
770 TF_SetStatusFromGCSStatus(item.status(), status);
771 return result;
772 }
773 auto value = *std::move(item);
774 std::string children = absl::holds_alternative<std::string>(value)
775 ? absl::get<std::string>(value)
776 : absl::get<gcs::ObjectMetadata>(value).name();
777 auto pos = children.find(prefix);
778 if (pos != 0) {
779 TF_SetStatus(status, TF_INTERNAL,
780 absl::StrCat("Unexpected response: the returned file name ",
781 children, " doesn't match the prefix ", prefix)
782 .c_str());
783 return result;
784 }
785 children.erase(0, prefix.length());
786 if (!children.empty() || include_self_directory_marker) {
787 result.emplace_back(children);
788 }
789 ++count;
790 }
791
792 return result;
793 }
794
FolderExists(GCSFile * gcs_file,std::string dir,TF_Status * status)795 static bool FolderExists(GCSFile* gcs_file, std::string dir,
796 TF_Status* status) {
797 ExpiringLRUCache<GcsFileStat>::ComputeFunc compute_func =
798 [gcs_file](const std::string& dir, GcsFileStat* stat, TF_Status* status) {
799 auto children =
800 GetChildrenBounded(gcs_file, dir, 1, true, true, status);
801 if (TF_GetCode(status) != TF_OK) return;
802 if (!children.empty()) {
803 stat->base = {0, 0, true};
804 return TF_SetStatus(status, TF_OK, "");
805 } else {
806 return TF_SetStatus(status, TF_INVALID_ARGUMENT, "Not a directory!");
807 }
808 };
809 GcsFileStat stat;
810 MaybeAppendSlash(&dir);
811 gcs_file->stat_cache->LookupOrCompute(dir, &stat, compute_func, status);
812 if (TF_GetCode(status) != TF_OK && TF_GetCode(status) != TF_INVALID_ARGUMENT)
813 return false;
814 if (TF_GetCode(status) == TF_INVALID_ARGUMENT) {
815 TF_SetStatus(status, TF_OK, "");
816 return false;
817 }
818 return true;
819 }
820
ClearFileCaches(GCSFile * gcs_file,const std::string & path)821 static void ClearFileCaches(GCSFile* gcs_file, const std::string& path) {
822 absl::ReaderMutexLock l(&gcs_file->block_cache_lock);
823 gcs_file->file_block_cache->RemoveFile(path);
824 gcs_file->stat_cache->Delete(path);
825 }
826
PathExists(const TF_Filesystem * filesystem,const char * path,TF_Status * status)827 void PathExists(const TF_Filesystem* filesystem, const char* path,
828 TF_Status* status) {
829 std::string bucket, object;
830 ParseGCSPath(path, true, &bucket, &object, status);
831 if (TF_GetCode(status) != TF_OK) return;
832
833 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
834 if (object.empty()) {
835 bool result = BucketExists(gcs_file, bucket, status);
836 if (result) return TF_SetStatus(status, TF_OK, "");
837 }
838
839 GcsFileStat stat;
840 StatForObject(gcs_file, path, bucket, object, &stat, status);
841 if (TF_GetCode(status) != TF_NOT_FOUND) return;
842
843 bool result = FolderExists(gcs_file, path, status);
844 if (TF_GetCode(status) != TF_OK || (TF_GetCode(status) == TF_OK && result))
845 return;
846 return TF_SetStatus(
847 status, TF_NOT_FOUND,
848 absl::StrCat("The path ", path, " does not exist.").c_str());
849 }
850
CreateDir(const TF_Filesystem * filesystem,const char * path,TF_Status * status)851 void CreateDir(const TF_Filesystem* filesystem, const char* path,
852 TF_Status* status) {
853 std::string dir = path;
854 MaybeAppendSlash(&dir);
855 TF_VLog(3,
856 "CreateDir: creating directory with path: %s and "
857 "path_with_slash: %s",
858 path, dir.c_str());
859 std::string bucket, object;
860 ParseGCSPath(dir, true, &bucket, &object, status);
861 if (TF_GetCode(status) != TF_OK) return;
862 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
863 if (object.empty()) {
864 bool is_directory = BucketExists(gcs_file, bucket, status);
865 if (TF_GetCode(status) != TF_OK) return;
866 if (!is_directory)
867 TF_SetStatus(status, TF_NOT_FOUND,
868 absl::StrCat("The specified bucket ", dir, " was not found.")
869 .c_str());
870 return;
871 }
872
873 PathExists(filesystem, dir.c_str(), status);
874 if (TF_GetCode(status) == TF_OK) {
875 // Use the original name for a correct error here.
876 TF_VLog(3, "CreateDir: directory already exists, not uploading %s", path);
877 return TF_SetStatus(status, TF_ALREADY_EXISTS, path);
878 }
879
880 auto metadata = gcs_file->gcs_client.InsertObject(
881 bucket, object, "",
882 // Adding this parameter means HTTP_CODE_PRECONDITION_FAILED
883 // will be returned if the object already exists, so avoid reuploading.
884 gcs::IfGenerationMatch(0), gcs::Fields(""));
885 TF_SetStatusFromGCSStatus(metadata.status(), status);
886 if (TF_GetCode(status) == TF_FAILED_PRECONDITION)
887 TF_SetStatus(status, TF_ALREADY_EXISTS, path);
888 }
889
890 // TODO(vnvo2409): `RecursivelyCreateDir` should use `CreateDir` instead of the
891 // default implementation. Because we could create an empty object whose
892 // key is equal to the `path` and Google Cloud Console will automatically
893 // display it as a directory tree.
894
DeleteFile(const TF_Filesystem * filesystem,const char * path,TF_Status * status)895 void DeleteFile(const TF_Filesystem* filesystem, const char* path,
896 TF_Status* status) {
897 std::string bucket, object;
898 ParseGCSPath(path, false, &bucket, &object, status);
899 if (TF_GetCode(status) != TF_OK) return;
900 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
901 auto gcs_status = gcs_file->gcs_client.DeleteObject(bucket, object);
902 TF_SetStatusFromGCSStatus(gcs_status, status);
903 if (TF_GetCode(status) == TF_OK) ClearFileCaches(gcs_file, path);
904 }
905
906 // Checks that the directory is empty (i.e no objects with this prefix exist).
907 // Deletes the GCS directory marker if it exists.
DeleteDir(const TF_Filesystem * filesystem,const char * path,TF_Status * status)908 void DeleteDir(const TF_Filesystem* filesystem, const char* path,
909 TF_Status* status) {
910 // A directory is considered empty either if there are no matching objects
911 // with the corresponding name prefix or if there is exactly one matching
912 // object and it is the directory marker. Therefore we need to retrieve
913 // at most two children for the prefix to detect if a directory is empty.
914 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
915 auto childrens = GetChildrenBounded(gcs_file, path, 2, true, true, status);
916 if (TF_GetCode(status) != TF_OK) return;
917 if (childrens.size() > 1 || (childrens.size() == 1 && !childrens[0].empty()))
918 return TF_SetStatus(status, TF_FAILED_PRECONDITION,
919 "Cannot delete a non-empty directory.");
920 if (childrens.size() == 1 && childrens[0].empty()) {
921 // This is the directory marker object. Delete it.
922 std::string dir = path;
923 MaybeAppendSlash(&dir);
924 DeleteFile(filesystem, dir.c_str(), status);
925 return;
926 }
927 TF_SetStatus(status, TF_OK, "");
928 }
929
CopyFile(const TF_Filesystem * filesystem,const char * src,const char * dst,TF_Status * status)930 void CopyFile(const TF_Filesystem* filesystem, const char* src, const char* dst,
931 TF_Status* status) {
932 std::string bucket_src, object_src;
933 ParseGCSPath(src, false, &bucket_src, &object_src, status);
934 if (TF_GetCode(status) != TF_OK) return;
935
936 std::string bucket_dst, object_dst;
937 ParseGCSPath(dst, false, &bucket_dst, &object_dst, status);
938 if (TF_GetCode(status) != TF_OK) return;
939
940 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
941 auto metadata = gcs_file->gcs_client.RewriteObjectBlocking(
942 bucket_src, object_src, bucket_dst, object_dst,
943 gcs::Fields("done,rewriteToken"));
944 TF_SetStatusFromGCSStatus(metadata.status(), status);
945 }
946
IsDirectory(const TF_Filesystem * filesystem,const char * path,TF_Status * status)947 bool IsDirectory(const TF_Filesystem* filesystem, const char* path,
948 TF_Status* status) {
949 std::string bucket, object;
950 ParseGCSPath(path, true, &bucket, &object, status);
951 if (TF_GetCode(status) != TF_OK) return false;
952
953 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
954 if (object.empty()) {
955 bool result = BucketExists(gcs_file, bucket, status);
956 if (TF_GetCode(status) != TF_OK) return false;
957 if (!result)
958 TF_SetStatus(
959 status, TF_NOT_FOUND,
960 absl::StrCat("The specified bucket gs://", bucket, " was not found.")
961 .c_str());
962 return result;
963 }
964
965 bool is_folder = FolderExists(gcs_file, path, status);
966 if (TF_GetCode(status) != TF_OK) return false;
967 if (is_folder) return true;
968
969 bool is_object = ObjectExists(gcs_file, path, bucket, object, status);
970 if (TF_GetCode(status) != TF_OK) return false;
971 if (is_object) {
972 TF_SetStatus(
973 status, TF_FAILED_PRECONDITION,
974 absl::StrCat("The specified path ", path, " is not a directory.")
975 .c_str());
976 return false;
977 }
978 TF_SetStatus(status, TF_NOT_FOUND,
979 absl::StrCat("The path ", path, " does not exist.").c_str());
980 return false;
981 }
982
RenameObject(const TF_Filesystem * filesystem,const std::string & src,const std::string & dst,TF_Status * status)983 static void RenameObject(const TF_Filesystem* filesystem,
984 const std::string& src, const std::string& dst,
985 TF_Status* status) {
986 TF_VLog(3, "RenameObject: started %s to %s", src.c_str(), dst.c_str());
987 std::string bucket_src, object_src;
988 ParseGCSPath(src, false, &bucket_src, &object_src, status);
989 if (TF_GetCode(status) != TF_OK) return;
990
991 std::string bucket_dst, object_dst;
992 ParseGCSPath(dst, false, &bucket_dst, &object_dst, status);
993 if (TF_GetCode(status) != TF_OK) return;
994
995 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
996 auto metadata = gcs_file->gcs_client.RewriteObjectBlocking(
997 bucket_src, object_src, bucket_dst, object_dst,
998 gcs::Fields("done,rewriteToken"));
999 TF_SetStatusFromGCSStatus(metadata.status(), status);
1000 if (TF_GetCode(status) != TF_OK) return;
1001 TF_VLog(3, "RenameObject: finished %s to %s", src.c_str(), dst.c_str());
1002
1003 ClearFileCaches(gcs_file, dst);
1004 DeleteFile(filesystem, src.c_str(), status);
1005 }
1006
RenameFile(const TF_Filesystem * filesystem,const char * src,const char * dst,TF_Status * status)1007 void RenameFile(const TF_Filesystem* filesystem, const char* src,
1008 const char* dst, TF_Status* status) {
1009 if (!IsDirectory(filesystem, src, status)) {
1010 if (TF_GetCode(status) == TF_FAILED_PRECONDITION) {
1011 TF_SetStatus(status, TF_OK, "");
1012 RenameObject(filesystem, src, dst, status);
1013 }
1014 return;
1015 }
1016
1017 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
1018 std::vector<std::string> childrens =
1019 GetChildrenBounded(gcs_file, src, UINT64_MAX, true, true, status);
1020 if (TF_GetCode(status) != TF_OK) return;
1021
1022 std::string src_dir = src;
1023 std::string dst_dir = dst;
1024 MaybeAppendSlash(&src_dir);
1025 MaybeAppendSlash(&dst_dir);
1026 for (const std::string& children : childrens) {
1027 RenameObject(filesystem, src_dir + children, dst_dir + children, status);
1028 if (TF_GetCode(status) != TF_OK) return;
1029 }
1030 TF_SetStatus(status, TF_OK, "");
1031 }
1032
DeleteRecursively(const TF_Filesystem * filesystem,const char * path,uint64_t * undeleted_files,uint64_t * undeleted_dirs,TF_Status * status)1033 void DeleteRecursively(const TF_Filesystem* filesystem, const char* path,
1034 uint64_t* undeleted_files, uint64_t* undeleted_dirs,
1035 TF_Status* status) {
1036 if (!undeleted_files || !undeleted_dirs)
1037 return TF_SetStatus(
1038 status, TF_INTERNAL,
1039 "'undeleted_files' and 'undeleted_dirs' cannot be nullptr.");
1040 *undeleted_files = 0;
1041 *undeleted_dirs = 0;
1042 if (!IsDirectory(filesystem, path, status)) {
1043 *undeleted_dirs = 1;
1044 return;
1045 }
1046 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
1047 std::vector<std::string> childrens =
1048 GetChildrenBounded(gcs_file, path, UINT64_MAX, true, true, status);
1049 if (TF_GetCode(status) != TF_OK) return;
1050
1051 std::string dir = path;
1052 MaybeAppendSlash(&dir);
1053 for (const std::string& children : childrens) {
1054 const std::string& full_path = dir + children;
1055 DeleteFile(filesystem, full_path.c_str(), status);
1056 if (TF_GetCode(status) != TF_OK) {
1057 if (IsDirectory(filesystem, full_path.c_str(), status))
1058 // The object is a directory marker.
1059 (*undeleted_dirs)++;
1060 else
1061 (*undeleted_files)++;
1062 }
1063 }
1064 }
1065
GetChildren(const TF_Filesystem * filesystem,const char * path,char *** entries,TF_Status * status)1066 int GetChildren(const TF_Filesystem* filesystem, const char* path,
1067 char*** entries, TF_Status* status) {
1068 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
1069 std::vector<std::string> childrens =
1070 GetChildrenBounded(gcs_file, path, UINT64_MAX, false, false, status);
1071 if (TF_GetCode(status) != TF_OK) return -1;
1072
1073 int num_entries = childrens.size();
1074 *entries = static_cast<char**>(
1075 plugin_memory_allocate(num_entries * sizeof((*entries)[0])));
1076 for (int i = 0; i < num_entries; i++)
1077 (*entries)[i] = strdup(childrens[i].c_str());
1078 TF_SetStatus(status, TF_OK, "");
1079 return num_entries;
1080 }
1081
Stat(const TF_Filesystem * filesystem,const char * path,TF_FileStatistics * stats,TF_Status * status)1082 void Stat(const TF_Filesystem* filesystem, const char* path,
1083 TF_FileStatistics* stats, TF_Status* status) {
1084 std::string bucket, object;
1085 ParseGCSPath(path, true, &bucket, &object, status);
1086 if (TF_GetCode(status) != TF_OK) return;
1087
1088 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
1089 if (object.empty()) {
1090 auto bucket_metadata =
1091 gcs_file->gcs_client.GetBucketMetadata(bucket, gcs::Fields(""));
1092 TF_SetStatusFromGCSStatus(bucket_metadata.status(), status);
1093 if (TF_GetCode(status) == TF_OK) {
1094 stats->is_directory = true;
1095 stats->length = 0;
1096 stats->mtime_nsec = 0;
1097 }
1098 return;
1099 }
1100 if (IsDirectory(filesystem, path, status)) {
1101 stats->is_directory = true;
1102 stats->length = 0;
1103 stats->mtime_nsec = 0;
1104 return TF_SetStatus(status, TF_OK, "");
1105 }
1106 if (TF_GetCode(status) == TF_FAILED_PRECONDITION) {
1107 auto metadata = gcs_file->gcs_client.GetObjectMetadata(
1108 bucket, object, gcs::Fields("size,timeStorageClassUpdated"));
1109 if (metadata) {
1110 stats->is_directory = false;
1111 stats->length = metadata.value().size();
1112 stats->mtime_nsec = metadata.value()
1113 .time_storage_class_updated()
1114 .time_since_epoch()
1115 .count();
1116 }
1117 TF_SetStatusFromGCSStatus(metadata.status(), status);
1118 }
1119 }
1120
GetFileSize(const TF_Filesystem * filesystem,const char * path,TF_Status * status)1121 int64_t GetFileSize(const TF_Filesystem* filesystem, const char* path,
1122 TF_Status* status) {
1123 // Only validate the name.
1124 std::string bucket, object;
1125 ParseGCSPath(path, false, &bucket, &object, status);
1126 if (TF_GetCode(status) != TF_OK) return -1;
1127
1128 TF_FileStatistics stat;
1129 Stat(filesystem, path, &stat, status);
1130 return stat.length;
1131 }
1132
TranslateName(const TF_Filesystem * filesystem,const char * uri)1133 static char* TranslateName(const TF_Filesystem* filesystem, const char* uri) {
1134 return strdup(uri);
1135 }
1136
FlushCaches(const TF_Filesystem * filesystem)1137 static void FlushCaches(const TF_Filesystem* filesystem) {
1138 auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
1139 absl::ReaderMutexLock l(&gcs_file->block_cache_lock);
1140 gcs_file->file_block_cache->Flush();
1141 gcs_file->stat_cache->Clear();
1142 }
1143
1144 } // namespace tf_gcs_filesystem
1145
ProvideFilesystemSupportFor(TF_FilesystemPluginOps * ops,const char * uri)1146 static void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops,
1147 const char* uri) {
1148 TF_SetFilesystemVersionMetadata(ops);
1149 ops->scheme = strdup(uri);
1150
1151 ops->random_access_file_ops = static_cast<TF_RandomAccessFileOps*>(
1152 plugin_memory_allocate(TF_RANDOM_ACCESS_FILE_OPS_SIZE));
1153 ops->random_access_file_ops->cleanup = tf_random_access_file::Cleanup;
1154 ops->random_access_file_ops->read = tf_random_access_file::Read;
1155
1156 ops->writable_file_ops = static_cast<TF_WritableFileOps*>(
1157 plugin_memory_allocate(TF_WRITABLE_FILE_OPS_SIZE));
1158 ops->writable_file_ops->cleanup = tf_writable_file::Cleanup;
1159
1160 ops->read_only_memory_region_ops = static_cast<TF_ReadOnlyMemoryRegionOps*>(
1161 plugin_memory_allocate(TF_READ_ONLY_MEMORY_REGION_OPS_SIZE));
1162 ops->read_only_memory_region_ops->cleanup =
1163 tf_read_only_memory_region::Cleanup;
1164 ops->read_only_memory_region_ops->data = tf_read_only_memory_region::Data;
1165 ops->read_only_memory_region_ops->length = tf_read_only_memory_region::Length;
1166
1167 ops->filesystem_ops = static_cast<TF_FilesystemOps*>(
1168 plugin_memory_allocate(TF_FILESYSTEM_OPS_SIZE));
1169 ops->filesystem_ops->init = tf_gcs_filesystem::Init;
1170 ops->filesystem_ops->cleanup = tf_gcs_filesystem::Cleanup;
1171 ops->filesystem_ops->new_random_access_file =
1172 tf_gcs_filesystem::NewRandomAccessFile;
1173 ops->filesystem_ops->new_writable_file = tf_gcs_filesystem::NewWritableFile;
1174 ops->filesystem_ops->new_appendable_file =
1175 tf_gcs_filesystem::NewAppendableFile;
1176 ops->filesystem_ops->new_read_only_memory_region_from_file =
1177 tf_gcs_filesystem::NewReadOnlyMemoryRegionFromFile;
1178 ops->filesystem_ops->create_dir = tf_gcs_filesystem::CreateDir;
1179 ops->filesystem_ops->delete_file = tf_gcs_filesystem::DeleteFile;
1180 ops->filesystem_ops->delete_dir = tf_gcs_filesystem::DeleteDir;
1181 ops->filesystem_ops->delete_recursively =
1182 tf_gcs_filesystem::DeleteRecursively;
1183 ops->filesystem_ops->copy_file = tf_gcs_filesystem::CopyFile;
1184 ops->filesystem_ops->path_exists = tf_gcs_filesystem::PathExists;
1185 ops->filesystem_ops->is_directory = tf_gcs_filesystem::IsDirectory;
1186 ops->filesystem_ops->stat = tf_gcs_filesystem::Stat;
1187 ops->filesystem_ops->get_children = tf_gcs_filesystem::GetChildren;
1188 ops->filesystem_ops->translate_name = tf_gcs_filesystem::TranslateName;
1189 ops->filesystem_ops->flush_caches = tf_gcs_filesystem::FlushCaches;
1190 }
1191
TF_InitPlugin(TF_FilesystemPluginInfo * info)1192 void TF_InitPlugin(TF_FilesystemPluginInfo* info) {
1193 info->plugin_memory_allocate = plugin_memory_allocate;
1194 info->plugin_memory_free = plugin_memory_free;
1195 info->num_schemes = 1;
1196 info->ops = static_cast<TF_FilesystemPluginOps*>(
1197 plugin_memory_allocate(info->num_schemes * sizeof(info->ops[0])));
1198 ProvideFilesystemSupportFor(&info->ops[0], "gs");
1199 }
1200