xref: /aosp_15_r20/external/tensorflow/tensorflow/c/experimental/filesystem/plugins/gcs/gcs_filesystem.cc (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/c/experimental/filesystem/plugins/gcs/gcs_filesystem.h"
16 
17 #include <stdlib.h>
18 #include <string.h>
19 
20 #include "absl/strings/numbers.h"
21 #include "absl/strings/str_cat.h"
22 #include "absl/types/variant.h"
23 #include "google/cloud/storage/client.h"
24 #include "tensorflow/c/env.h"
25 #include "tensorflow/c/experimental/filesystem/plugins/gcs/gcs_helper.h"
26 #include "tensorflow/c/logging.h"
27 #include "tensorflow/c/tf_status.h"
28 
29 // Implementation of a filesystem for GCS environments.
30 // This filesystem will support `gs://` URI schemes.
31 namespace gcs = google::cloud::storage;
32 
33 // The environment variable that overrides the block size for aligned reads from
34 // GCS. Specified in MB (e.g. "16" = 16 x 1024 x 1024 = 16777216 bytes).
35 constexpr char kBlockSize[] = "GCS_READ_CACHE_BLOCK_SIZE_MB";
36 constexpr size_t kDefaultBlockSize = 64 * 1024 * 1024;
37 // The environment variable that overrides the max size of the LRU cache of
38 // blocks read from GCS. Specified in MB.
39 constexpr char kMaxCacheSize[] = "GCS_READ_CACHE_MAX_SIZE_MB";
40 constexpr size_t kDefaultMaxCacheSize = 0;
41 // The environment variable that overrides the maximum staleness of cached file
42 // contents. Once any block of a file reaches this staleness, all cached blocks
43 // will be evicted on the next read.
44 constexpr char kMaxStaleness[] = "GCS_READ_CACHE_MAX_STALENESS";
45 constexpr uint64_t kDefaultMaxStaleness = 0;
46 
47 constexpr char kStatCacheMaxAge[] = "GCS_STAT_CACHE_MAX_AGE";
48 constexpr uint64_t kStatCacheDefaultMaxAge = 5;
49 // The environment variable that overrides the maximum number of entries in the
50 // Stat cache.
51 constexpr char kStatCacheMaxEntries[] = "GCS_STAT_CACHE_MAX_ENTRIES";
52 constexpr size_t kStatCacheDefaultMaxEntries = 1024;
53 
54 // How to upload new data when Flush() is called multiple times.
55 // By default the entire file is reuploaded.
56 constexpr char kAppendMode[] = "GCS_APPEND_MODE";
57 // If GCS_APPEND_MODE=compose then instead the new data is uploaded to a
58 // temporary object and composed with the original object. This is disabled by
59 // default as the multiple API calls required add a risk of stranding temporary
60 // objects.
61 constexpr char kComposeAppend[] = "compose";
62 
63 // We can cast `google::cloud::StatusCode` to `TF_Code` because they have the
64 // same integer values. See
65 // https://github.com/googleapis/google-cloud-cpp/blob/6c09cbfa0160bc046e5509b4dd2ab4b872648b4a/google/cloud/status.h#L32-L52
TF_SetStatusFromGCSStatus(const google::cloud::Status & gcs_status,TF_Status * status)66 static inline void TF_SetStatusFromGCSStatus(
67     const google::cloud::Status& gcs_status, TF_Status* status) {
68   TF_SetStatus(status, static_cast<TF_Code>(gcs_status.code()),
69                gcs_status.message().c_str());
70 }
71 
plugin_memory_allocate(size_t size)72 static void* plugin_memory_allocate(size_t size) { return calloc(1, size); }
plugin_memory_free(void * ptr)73 static void plugin_memory_free(void* ptr) { free(ptr); }
74 
ParseGCSPath(const std::string & fname,bool object_empty_ok,std::string * bucket,std::string * object,TF_Status * status)75 void ParseGCSPath(const std::string& fname, bool object_empty_ok,
76                   std::string* bucket, std::string* object, TF_Status* status) {
77   size_t scheme_end = fname.find("://") + 2;
78   if (fname.substr(0, scheme_end + 1) != "gs://") {
79     TF_SetStatus(status, TF_INVALID_ARGUMENT,
80                  "GCS path doesn't start with 'gs://'.");
81     return;
82   }
83 
84   size_t bucket_end = fname.find('/', scheme_end + 1);
85   if (bucket_end == std::string::npos) {
86     TF_SetStatus(status, TF_INVALID_ARGUMENT,
87                  "GCS path doesn't contain a bucket name.");
88     return;
89   }
90 
91   *bucket = fname.substr(scheme_end + 1, bucket_end - scheme_end - 1);
92   *object = fname.substr(bucket_end + 1);
93 
94   if (object->empty() && !object_empty_ok) {
95     TF_SetStatus(status, TF_INVALID_ARGUMENT,
96                  "GCS path doesn't contain an object name.");
97   }
98 }
99 
100 /// Appends a trailing slash if the name doesn't already have one.
MaybeAppendSlash(std::string * name)101 static void MaybeAppendSlash(std::string* name) {
102   if (name->empty())
103     *name = "/";
104   else if (name->back() != '/')
105     name->push_back('/');
106 }
107 
108 // A helper function to actually read the data from GCS.
LoadBufferFromGCS(const std::string & path,size_t offset,size_t buffer_size,char * buffer,tf_gcs_filesystem::GCSFile * gcs_file,TF_Status * status)109 static int64_t LoadBufferFromGCS(const std::string& path, size_t offset,
110                                  size_t buffer_size, char* buffer,
111                                  tf_gcs_filesystem::GCSFile* gcs_file,
112                                  TF_Status* status) {
113   std::string bucket, object;
114   ParseGCSPath(path, false, &bucket, &object, status);
115   if (TF_GetCode(status) != TF_OK) return -1;
116   auto stream = gcs_file->gcs_client.ReadObject(
117       bucket, object, gcs::ReadRange(offset, offset + buffer_size));
118   TF_SetStatusFromGCSStatus(stream.status(), status);
119   if ((TF_GetCode(status) != TF_OK) &&
120       (TF_GetCode(status) != TF_OUT_OF_RANGE)) {
121     return -1;
122   }
123   int64_t read;
124   auto content_length = stream.headers().find("content-length");
125   if (content_length == stream.headers().end()) {
126     // When we read a file with offset that is bigger than the actual file size.
127     // GCS will return an empty header (e.g no `content-length` header). In this
128     // case, we will set read to `0` and continue.
129     read = 0;
130   } else if (!absl::SimpleAtoi(content_length->second, &read)) {
131     TF_SetStatus(status, TF_UNKNOWN, "Could not get content-length header");
132     return -1;
133   }
134   // `TF_OUT_OF_RANGE` isn't considered as an error. So we clear it here.
135   TF_SetStatus(status, TF_OK, "");
136   TF_VLog(1, "Successful read of %s @ %u of size: %u", path.c_str(), offset,
137           read);
138   stream.read(buffer, read);
139   read = stream.gcount();
140   if (read < buffer_size) {
141     // Check stat cache to see if we encountered an interrupted read.
142     tf_gcs_filesystem::GcsFileStat stat;
143     if (gcs_file->stat_cache->Lookup(path, &stat)) {
144       if (offset + read < stat.base.length) {
145         TF_SetStatus(status, TF_INTERNAL,
146                      absl::StrCat("File contents are inconsistent for file: ",
147                                   path, " @ ", offset)
148                          .c_str());
149       }
150       TF_VLog(2, "Successful integrity check for: %s @ %u", path.c_str(),
151               offset);
152     }
153   }
154   return read;
155 }
156 
157 // SECTION 1. Implementation for `TF_RandomAccessFile`
158 // ----------------------------------------------------------------------------
159 namespace tf_random_access_file {
160 using ReadFn =
161     std::function<int64_t(const std::string& path, uint64_t offset, size_t n,
162                           char* buffer, TF_Status* status)>;
163 typedef struct GCSFile {
164   const std::string path;
165   const bool is_cache_enable;
166   const uint64_t buffer_size;
167   ReadFn read_fn;
168   absl::Mutex buffer_mutex;
169   uint64_t buffer_start ABSL_GUARDED_BY(buffer_mutex);
170   bool buffer_end_is_past_eof ABSL_GUARDED_BY(buffer_mutex);
171   std::string buffer ABSL_GUARDED_BY(buffer_mutex);
172 
GCSFiletf_random_access_file::GCSFile173   GCSFile(std::string path, bool is_cache_enable, uint64_t buffer_size,
174           ReadFn read_fn)
175       : path(path),
176         is_cache_enable(is_cache_enable),
177         buffer_size(buffer_size),
178         read_fn(std::move(read_fn)),
179         buffer_mutex(),
180         buffer_start(0),
181         buffer_end_is_past_eof(false),
182         buffer() {}
183 } GCSFile;
184 
Cleanup(TF_RandomAccessFile * file)185 void Cleanup(TF_RandomAccessFile* file) {
186   auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
187   delete gcs_file;
188 }
189 
190 // `google-cloud-cpp` is working on a feature that we may want to use.
191 // See https://github.com/googleapis/google-cloud-cpp/issues/4013.
Read(const TF_RandomAccessFile * file,uint64_t offset,size_t n,char * buffer,TF_Status * status)192 int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
193              char* buffer, TF_Status* status) {
194   auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
195   if (gcs_file->is_cache_enable || n > gcs_file->buffer_size) {
196     return gcs_file->read_fn(gcs_file->path, offset, n, buffer, status);
197   } else {
198     absl::MutexLock l(&gcs_file->buffer_mutex);
199     size_t buffer_end = gcs_file->buffer_start + gcs_file->buffer.size();
200     size_t copy_size = 0;
201     if (offset < buffer_end && gcs_file->buffer_start) {
202       copy_size = (std::min)(n, static_cast<size_t>(buffer_end - offset));
203       memcpy(buffer,
204              gcs_file->buffer.data() + (offset - gcs_file->buffer_start),
205              copy_size);
206     }
207     bool consumed_buffer_to_eof =
208         offset + copy_size >= buffer_end && gcs_file->buffer_end_is_past_eof;
209     if (copy_size < n && !consumed_buffer_to_eof) {
210       gcs_file->buffer_start = offset + copy_size;
211       gcs_file->buffer.resize(gcs_file->buffer_size);
212       auto read_fill_buffer = gcs_file->read_fn(
213           gcs_file->path, gcs_file->buffer_start, gcs_file->buffer_size,
214           &(gcs_file->buffer[0]), status);
215       gcs_file->buffer_end_is_past_eof =
216           (TF_GetCode(status) == TF_OUT_OF_RANGE);
217       if (read_fill_buffer >= 0) gcs_file->buffer.resize(read_fill_buffer);
218       if (TF_GetCode(status) != TF_OK &&
219           TF_GetCode(status) != TF_OUT_OF_RANGE) {
220         // Empty the buffer to avoid caching bad reads.
221         gcs_file->buffer.resize(0);
222         return -1;
223       }
224       size_t remaining_copy =
225           (std::min)(n - copy_size, gcs_file->buffer.size());
226       memcpy(buffer + copy_size, gcs_file->buffer.data(), remaining_copy);
227       copy_size += remaining_copy;
228     }
229     if (copy_size < n) {
230       // Forget the end-of-file flag to allow for clients that poll on the
231       // same file.
232       gcs_file->buffer_end_is_past_eof = false;
233       TF_SetStatus(status, TF_OUT_OF_RANGE, "Read less bytes than requested");
234       return copy_size;
235     }
236     TF_SetStatus(status, TF_OK, "");
237     return copy_size;
238   }
239 }
240 
241 }  // namespace tf_random_access_file
242 
243 // SECTION 2. Implementation for `TF_WritableFile`
244 // ----------------------------------------------------------------------------
245 namespace tf_writable_file {
246 typedef struct GCSFile {
247   const std::string bucket;
248   const std::string object;
249   gcs::Client* gcs_client;  // not owned
250   TempFile outfile;
251   bool sync_need;
252   // `offset` tells us how many bytes of this file are already uploaded to
253   // server. If `offset == -1`, we always upload the entire temporary file.
254   int64_t offset;
255 } GCSFile;
256 
SyncImpl(const std::string & bucket,const std::string & object,int64_t * offset,TempFile * outfile,gcs::Client * gcs_client,TF_Status * status)257 static void SyncImpl(const std::string& bucket, const std::string& object,
258                      int64_t* offset, TempFile* outfile,
259                      gcs::Client* gcs_client, TF_Status* status) {
260   outfile->flush();
261   // `*offset == 0` means this file does not exist on the server.
262   if (*offset == -1 || *offset == 0) {
263     // UploadFile will automatically switch to resumable upload based on Client
264     // configuration.
265     auto metadata = gcs_client->UploadFile(outfile->getName(), bucket, object,
266                                            gcs::Fields("size"));
267     if (!metadata) {
268       TF_SetStatusFromGCSStatus(metadata.status(), status);
269       return;
270     }
271     if (*offset == 0) {
272       if (!outfile->truncate()) {
273         TF_SetStatus(status, TF_INTERNAL,
274                      "Could not truncate internal temporary file.");
275         return;
276       }
277       *offset = static_cast<int64_t>(metadata->size());
278     }
279     outfile->clear();
280     outfile->seekp(0, std::ios::end);
281     TF_SetStatus(status, TF_OK, "");
282   } else {
283     std::string temporary_object =
284         gcs::CreateRandomPrefixName("tf_writable_file_gcs");
285     auto metadata = gcs_client->UploadFile(outfile->getName(), bucket,
286                                            temporary_object, gcs::Fields(""));
287     if (!metadata) {
288       TF_SetStatusFromGCSStatus(metadata.status(), status);
289       return;
290     }
291     TF_VLog(3, "AppendObject: gs://%s/%s to gs://%s/%s", bucket.c_str(),
292             temporary_object.c_str(), bucket.c_str(), object.c_str());
293     const std::vector<gcs::ComposeSourceObject> source_objects = {
294         {object, {}, {}}, {temporary_object, {}, {}}};
295     metadata = gcs_client->ComposeObject(bucket, source_objects, object,
296                                          gcs::Fields("size"));
297     if (!metadata) {
298       TF_SetStatusFromGCSStatus(metadata.status(), status);
299       return;
300     }
301     // We have to delete the temporary object after composing.
302     auto delete_status = gcs_client->DeleteObject(bucket, temporary_object);
303     if (!delete_status.ok()) {
304       TF_SetStatusFromGCSStatus(delete_status, status);
305       return;
306     }
307     // We truncate the data that are already uploaded.
308     if (!outfile->truncate()) {
309       TF_SetStatus(status, TF_INTERNAL,
310                    "Could not truncate internal temporary file.");
311       return;
312     }
313     *offset = static_cast<int64_t>(metadata->size());
314     TF_SetStatus(status, TF_OK, "");
315   }
316 }
317 
Cleanup(TF_WritableFile * file)318 void Cleanup(TF_WritableFile* file) {
319   auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
320   delete gcs_file;
321 }
322 
Append(const TF_WritableFile * file,const char * buffer,size_t n,TF_Status * status)323 void Append(const TF_WritableFile* file, const char* buffer, size_t n,
324             TF_Status* status) {
325   auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
326   if (!gcs_file->outfile.is_open()) {
327     TF_SetStatus(status, TF_FAILED_PRECONDITION,
328                  "The internal temporary file is not writable.");
329     return;
330   }
331   TF_VLog(3, "Append: gs://%s/%s size %u", gcs_file->bucket.c_str(),
332           gcs_file->object.c_str(), n);
333   gcs_file->sync_need = true;
334   gcs_file->outfile.write(buffer, n);
335   if (!gcs_file->outfile)
336     TF_SetStatus(status, TF_INTERNAL,
337                  "Could not append to the internal temporary file.");
338   else
339     TF_SetStatus(status, TF_OK, "");
340 }
341 
Tell(const TF_WritableFile * file,TF_Status * status)342 int64_t Tell(const TF_WritableFile* file, TF_Status* status) {
343   auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
344   int64_t position = int64_t(gcs_file->outfile.tellp());
345   if (position == -1)
346     TF_SetStatus(status, TF_INTERNAL,
347                  "tellp on the internal temporary file failed");
348   else
349     TF_SetStatus(status, TF_OK, "");
350   return position == -1
351              ? -1
352              : position + (gcs_file->offset == -1 ? 0 : gcs_file->offset);
353 }
354 
Flush(const TF_WritableFile * file,TF_Status * status)355 void Flush(const TF_WritableFile* file, TF_Status* status) {
356   auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
357   if (gcs_file->sync_need) {
358     TF_VLog(3, "Flush started: gs://%s/%s", gcs_file->bucket.c_str(),
359             gcs_file->object.c_str());
360     if (!gcs_file->outfile) {
361       TF_SetStatus(status, TF_INTERNAL,
362                    "Could not append to the internal temporary file.");
363       return;
364     }
365     SyncImpl(gcs_file->bucket, gcs_file->object, &gcs_file->offset,
366              &gcs_file->outfile, gcs_file->gcs_client, status);
367     TF_VLog(3, "Flush finished: gs://%s/%s", gcs_file->bucket.c_str(),
368             gcs_file->object.c_str());
369     if (TF_GetCode(status) != TF_OK) return;
370     gcs_file->sync_need = false;
371   } else {
372     TF_SetStatus(status, TF_OK, "");
373   }
374 }
375 
Sync(const TF_WritableFile * file,TF_Status * status)376 void Sync(const TF_WritableFile* file, TF_Status* status) {
377   auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
378   TF_VLog(3, "Sync: gs://%s/%s", gcs_file->bucket.c_str(),
379           gcs_file->object.c_str());
380   Flush(file, status);
381 }
382 
Close(const TF_WritableFile * file,TF_Status * status)383 void Close(const TF_WritableFile* file, TF_Status* status) {
384   auto gcs_file = static_cast<GCSFile*>(file->plugin_file);
385   TF_VLog(3, "Close: gs://%s/%s", gcs_file->bucket.c_str(),
386           gcs_file->object.c_str());
387   if (gcs_file->sync_need) {
388     Flush(file, status);
389   }
390   gcs_file->outfile.close();
391 }
392 
393 }  // namespace tf_writable_file
394 
395 // SECTION 3. Implementation for `TF_ReadOnlyMemoryRegion`
396 // ----------------------------------------------------------------------------
397 namespace tf_read_only_memory_region {
398 typedef struct GCSMemoryRegion {
399   const void* const address;
400   const uint64_t length;
401 } GCSMemoryRegion;
402 
Cleanup(TF_ReadOnlyMemoryRegion * region)403 void Cleanup(TF_ReadOnlyMemoryRegion* region) {
404   auto r = static_cast<GCSMemoryRegion*>(region->plugin_memory_region);
405   plugin_memory_free(const_cast<void*>(r->address));
406   delete r;
407 }
408 
Data(const TF_ReadOnlyMemoryRegion * region)409 const void* Data(const TF_ReadOnlyMemoryRegion* region) {
410   auto r = static_cast<GCSMemoryRegion*>(region->plugin_memory_region);
411   return r->address;
412 }
413 
Length(const TF_ReadOnlyMemoryRegion * region)414 uint64_t Length(const TF_ReadOnlyMemoryRegion* region) {
415   auto r = static_cast<GCSMemoryRegion*>(region->plugin_memory_region);
416   return r->length;
417 }
418 
419 }  // namespace tf_read_only_memory_region
420 
421 // SECTION 4. Implementation for `TF_Filesystem`, the actual filesystem
422 // ----------------------------------------------------------------------------
423 namespace tf_gcs_filesystem {
424 // TODO(vnvo2409): Use partial reponse for better performance.
425 // TODO(vnvo2409): We could do some cleanups like `return TF_SetStatus`.
426 // TODO(vnvo2409): Refactor the filesystem implementation when
427 // https://github.com/googleapis/google-cloud-cpp/issues/4482 is done.
GCSFile(google::cloud::storage::Client && gcs_client)428 GCSFile::GCSFile(google::cloud::storage::Client&& gcs_client)
429     : gcs_client(gcs_client), block_cache_lock() {
430   const char* append_mode = std::getenv(kAppendMode);
431   compose = (append_mode != nullptr) && (!strcmp(kAppendMode, append_mode));
432 
433   uint64_t value;
434   block_size = kDefaultBlockSize;
435   size_t max_bytes = kDefaultMaxCacheSize;
436   uint64_t max_staleness = kDefaultMaxStaleness;
437 
438   // Apply the overrides for the block size (MB), max bytes (MB), and max
439   // staleness (seconds) if provided.
440   const char* block_size_env = std::getenv(kBlockSize);
441   if (block_size_env && absl::SimpleAtoi(block_size_env, &value)) {
442     block_size = value * 1024 * 1024;
443   }
444   const char* max_bytes_env = std::getenv(kMaxCacheSize);
445   if (max_bytes_env && absl::SimpleAtoi(max_bytes_env, &value)) {
446     max_bytes = static_cast<size_t>(value * 1024 * 1024);
447   }
448   const char* max_staleness_env = std::getenv(kMaxStaleness);
449   if (max_staleness_env && absl::SimpleAtoi(max_staleness_env, &value)) {
450     max_staleness = value;
451   }
452   TF_VLog(1, "GCS cache max size = %u ; block size = %u ; max staleness = %u",
453           max_bytes, block_size, max_staleness);
454 
455   file_block_cache = std::make_unique<RamFileBlockCache>(
456       block_size, max_bytes, max_staleness,
457       [this](const std::string& filename, size_t offset, size_t buffer_size,
458              char* buffer, TF_Status* status) {
459         return LoadBufferFromGCS(filename, offset, buffer_size, buffer, this,
460                                  status);
461       });
462 
463   uint64_t stat_cache_max_age = kStatCacheDefaultMaxAge;
464   size_t stat_cache_max_entries = kStatCacheDefaultMaxEntries;
465   const char* stat_cache_max_age_env = std::getenv(kStatCacheMaxAge);
466   if (stat_cache_max_age_env &&
467       absl::SimpleAtoi(stat_cache_max_age_env, &value)) {
468     stat_cache_max_age = value;
469   }
470   const char* stat_cache_max_entries_env = std::getenv(kStatCacheMaxEntries);
471   if (stat_cache_max_entries_env &&
472       absl::SimpleAtoi(stat_cache_max_entries_env, &value)) {
473     stat_cache_max_entries = static_cast<size_t>(value);
474   }
475   stat_cache = std::make_unique<ExpiringLRUCache<GcsFileStat>>(
476       stat_cache_max_age, stat_cache_max_entries);
477 }
478 
GCSFile(google::cloud::storage::Client && gcs_client,bool compose,uint64_t block_size,size_t max_bytes,uint64_t max_staleness,uint64_t stat_cache_max_age,size_t stat_cache_max_entries)479 GCSFile::GCSFile(google::cloud::storage::Client&& gcs_client, bool compose,
480                  uint64_t block_size, size_t max_bytes, uint64_t max_staleness,
481                  uint64_t stat_cache_max_age, size_t stat_cache_max_entries)
482     : gcs_client(gcs_client),
483       compose(compose),
484       block_cache_lock(),
485       block_size(block_size) {
486   file_block_cache = std::make_unique<RamFileBlockCache>(
487       block_size, max_bytes, max_staleness,
488       [this](const std::string& filename, size_t offset, size_t buffer_size,
489              char* buffer, TF_Status* status) {
490         return LoadBufferFromGCS(filename, offset, buffer_size, buffer, this,
491                                  status);
492       });
493   stat_cache = std::make_unique<ExpiringLRUCache<GcsFileStat>>(
494       stat_cache_max_age, stat_cache_max_entries);
495 }
496 
InitTest(TF_Filesystem * filesystem,bool compose,uint64_t block_size,size_t max_bytes,uint64_t max_staleness,uint64_t stat_cache_max_age,size_t stat_cache_max_entries,TF_Status * status)497 void InitTest(TF_Filesystem* filesystem, bool compose, uint64_t block_size,
498               size_t max_bytes, uint64_t max_staleness,
499               uint64_t stat_cache_max_age, size_t stat_cache_max_entries,
500               TF_Status* status) {
501   google::cloud::StatusOr<gcs::Client> client =
502       gcs::Client::CreateDefaultClient();
503   if (!client) {
504     TF_SetStatusFromGCSStatus(client.status(), status);
505     return;
506   }
507 
508   filesystem->plugin_filesystem =
509       new GCSFile(std::move(client.value()), compose, block_size, max_bytes,
510                   max_staleness, stat_cache_max_age, stat_cache_max_entries);
511   TF_SetStatus(status, TF_OK, "");
512 }
513 
Init(TF_Filesystem * filesystem,TF_Status * status)514 void Init(TF_Filesystem* filesystem, TF_Status* status) {
515   google::cloud::StatusOr<gcs::Client> client =
516       gcs::Client::CreateDefaultClient();
517   if (!client) {
518     TF_SetStatusFromGCSStatus(client.status(), status);
519     return;
520   }
521 
522   filesystem->plugin_filesystem = new GCSFile(std::move(client.value()));
523   TF_SetStatus(status, TF_OK, "");
524 }
525 
Cleanup(TF_Filesystem * filesystem)526 void Cleanup(TF_Filesystem* filesystem) {
527   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
528   delete gcs_file;
529 }
530 
UncachedStatForObject(const std::string & bucket,const std::string & object,GcsFileStat * stat,gcs::Client * gcs_client,TF_Status * status)531 static void UncachedStatForObject(const std::string& bucket,
532                                   const std::string& object, GcsFileStat* stat,
533                                   gcs::Client* gcs_client, TF_Status* status) {
534   auto metadata = gcs_client->GetObjectMetadata(
535       bucket, object, gcs::Fields("generation,size,timeStorageClassUpdated"));
536   if (!metadata) return TF_SetStatusFromGCSStatus(metadata.status(), status);
537   stat->generation_number = metadata->generation();
538   stat->base.length = metadata->size();
539   stat->base.mtime_nsec =
540       metadata->time_storage_class_updated().time_since_epoch().count();
541   stat->base.is_directory = object.back() == '/';
542   TF_VLog(1,
543           "Stat of: gs://%s/%s --  length: %u generation: %u; mtime_nsec: %u;",
544           bucket.c_str(), object.c_str(), stat->base.length,
545           stat->generation_number, stat->base.mtime_nsec);
546   return TF_SetStatus(status, TF_OK, "");
547 }
548 
549 // TODO(vnvo2409): Implement later
NewRandomAccessFile(const TF_Filesystem * filesystem,const char * path,TF_RandomAccessFile * file,TF_Status * status)550 void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path,
551                          TF_RandomAccessFile* file, TF_Status* status) {
552   std::string bucket, object;
553   ParseGCSPath(path, false, &bucket, &object, status);
554   if (TF_GetCode(status) != TF_OK) return;
555 
556   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
557   bool is_cache_enabled;
558   {
559     absl::MutexLock l(&gcs_file->block_cache_lock);
560     is_cache_enabled = gcs_file->file_block_cache->IsCacheEnabled();
561   }
562   auto read_fn = [gcs_file, is_cache_enabled, bucket, object](
563                      const std::string& path, uint64_t offset, size_t n,
564                      char* buffer, TF_Status* status) -> int64_t {
565     int64_t read = 0;
566     if (is_cache_enabled) {
567       absl::ReaderMutexLock l(&gcs_file->block_cache_lock);
568       GcsFileStat stat;
569       gcs_file->stat_cache->LookupOrCompute(
570           path, &stat,
571           [gcs_file, bucket, object](const std::string& path, GcsFileStat* stat,
572                                      TF_Status* status) {
573             UncachedStatForObject(bucket, object, stat, &gcs_file->gcs_client,
574                                   status);
575           },
576           status);
577       if (TF_GetCode(status) != TF_OK) return -1;
578       if (!gcs_file->file_block_cache->ValidateAndUpdateFileSignature(
579               path, stat.generation_number)) {
580         TF_VLog(
581             1,
582             "File signature has been changed. Refreshing the cache. Path: %s",
583             path.c_str());
584       }
585       read = gcs_file->file_block_cache->Read(path, offset, n, buffer, status);
586     } else {
587       read = LoadBufferFromGCS(path, offset, n, buffer, gcs_file, status);
588     }
589     if (TF_GetCode(status) != TF_OK) return -1;
590     if (read < n)
591       TF_SetStatus(status, TF_OUT_OF_RANGE, "Read less bytes than requested");
592     else
593       TF_SetStatus(status, TF_OK, "");
594     return read;
595   };
596   file->plugin_file = new tf_random_access_file::GCSFile(
597       std::move(path), is_cache_enabled, gcs_file->block_size, read_fn);
598   TF_SetStatus(status, TF_OK, "");
599 }
600 
NewWritableFile(const TF_Filesystem * filesystem,const char * path,TF_WritableFile * file,TF_Status * status)601 void NewWritableFile(const TF_Filesystem* filesystem, const char* path,
602                      TF_WritableFile* file, TF_Status* status) {
603   std::string bucket, object;
604   ParseGCSPath(path, false, &bucket, &object, status);
605   if (TF_GetCode(status) != TF_OK) return;
606 
607   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
608   char* temp_file_name = TF_GetTempFileName("");
609   file->plugin_file = new tf_writable_file::GCSFile(
610       {std::move(bucket), std::move(object), &gcs_file->gcs_client,
611        TempFile(temp_file_name, std::ios::binary | std::ios::out), true,
612        (gcs_file->compose ? 0 : -1)});
613   // We are responsible for freeing the pointer returned by TF_GetTempFileName
614   free(temp_file_name);
615   TF_VLog(3, "GcsWritableFile: %s", path);
616   TF_SetStatus(status, TF_OK, "");
617 }
618 
NewAppendableFile(const TF_Filesystem * filesystem,const char * path,TF_WritableFile * file,TF_Status * status)619 void NewAppendableFile(const TF_Filesystem* filesystem, const char* path,
620                        TF_WritableFile* file, TF_Status* status) {
621   std::string bucket, object;
622   ParseGCSPath(path, false, &bucket, &object, status);
623   if (TF_GetCode(status) != TF_OK) return;
624 
625   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
626   char* temp_file_name_c_str = TF_GetTempFileName("");
627   std::string temp_file_name(temp_file_name_c_str);  // To prevent memory-leak
628   free(temp_file_name_c_str);
629 
630   if (!gcs_file->compose) {
631     auto gcs_status =
632         gcs_file->gcs_client.DownloadToFile(bucket, object, temp_file_name);
633     TF_SetStatusFromGCSStatus(gcs_status, status);
634     auto status_code = TF_GetCode(status);
635     if (status_code != TF_OK && status_code != TF_NOT_FOUND) return;
636     // If this file does not exist on server, we will need to sync it.
637     bool sync_need = (status_code == TF_NOT_FOUND);
638     file->plugin_file = new tf_writable_file::GCSFile(
639         {std::move(bucket), std::move(object), &gcs_file->gcs_client,
640          TempFile(temp_file_name, std::ios::binary | std::ios::app), sync_need,
641          -1});
642   } else {
643     // If compose is true, we do not download anything.
644     // Instead we only check if this file exists on server or not.
645     auto metadata = gcs_file->gcs_client.GetObjectMetadata(bucket, object,
646                                                            gcs::Fields("size"));
647     TF_SetStatusFromGCSStatus(metadata.status(), status);
648     if (TF_GetCode(status) == TF_OK) {
649       file->plugin_file = new tf_writable_file::GCSFile(
650           {std::move(bucket), std::move(object), &gcs_file->gcs_client,
651            TempFile(temp_file_name, std::ios::binary | std::ios::trunc), false,
652            static_cast<int64_t>(metadata->size())});
653     } else if (TF_GetCode(status) == TF_NOT_FOUND) {
654       file->plugin_file = new tf_writable_file::GCSFile(
655           {std::move(bucket), std::move(object), &gcs_file->gcs_client,
656            TempFile(temp_file_name, std::ios::binary | std::ios::trunc), true,
657            0});
658     } else {
659       return;
660     }
661   }
662   TF_VLog(3, "GcsWritableFile: %s with existing file %s", path,
663           temp_file_name.c_str());
664   TF_SetStatus(status, TF_OK, "");
665 }
666 
667 // TODO(vnvo2409): We could download into a local temporary file and use
668 // memory-mapping.
NewReadOnlyMemoryRegionFromFile(const TF_Filesystem * filesystem,const char * path,TF_ReadOnlyMemoryRegion * region,TF_Status * status)669 void NewReadOnlyMemoryRegionFromFile(const TF_Filesystem* filesystem,
670                                      const char* path,
671                                      TF_ReadOnlyMemoryRegion* region,
672                                      TF_Status* status) {
673   std::string bucket, object;
674   ParseGCSPath(path, false, &bucket, &object, status);
675   if (TF_GetCode(status) != TF_OK) return;
676 
677   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
678   auto metadata = gcs_file->gcs_client.GetObjectMetadata(bucket, object,
679                                                          gcs::Fields("size"));
680   if (!metadata) {
681     TF_SetStatusFromGCSStatus(metadata.status(), status);
682     return;
683   }
684 
685   TF_RandomAccessFile reader;
686   NewRandomAccessFile(filesystem, path, &reader, status);
687   if (TF_GetCode(status) != TF_OK) return;
688   char* buffer = static_cast<char*>(plugin_memory_allocate(metadata->size()));
689   int64_t read =
690       tf_random_access_file::Read(&reader, 0, metadata->size(), buffer, status);
691   tf_random_access_file::Cleanup(&reader);
692   if (TF_GetCode(status) != TF_OK) return;
693 
694   if (read > 0 && buffer) {
695     region->plugin_memory_region =
696         new tf_read_only_memory_region::GCSMemoryRegion(
697             {buffer, static_cast<uint64_t>(read)});
698     TF_SetStatus(status, TF_OK, "");
699   } else if (read == 0) {
700     TF_SetStatus(status, TF_INVALID_ARGUMENT, "File is empty");
701   }
702 }
703 
StatForObject(GCSFile * gcs_file,const std::string & path,const std::string & bucket,const std::string & object,GcsFileStat * stat,TF_Status * status)704 static void StatForObject(GCSFile* gcs_file, const std::string& path,
705                           const std::string& bucket, const std::string& object,
706                           GcsFileStat* stat, TF_Status* status) {
707   if (object.empty())
708     return TF_SetStatus(
709         status, TF_INVALID_ARGUMENT,
710         absl::StrCat("'object' must be a non-empty string. (File: ", path, ")")
711             .c_str());
712   TF_SetStatus(status, TF_OK, "");
713   gcs_file->stat_cache->LookupOrCompute(
714       path, stat,
715       [gcs_file, bucket, object](const std::string& path, GcsFileStat* stat,
716                                  TF_Status* status) {
717         UncachedStatForObject(bucket, object, stat, &gcs_file->gcs_client,
718                               status);
719       },
720       status);
721 }
722 
ObjectExists(GCSFile * gcs_file,const std::string & path,const std::string & bucket,const std::string & object,TF_Status * status)723 static bool ObjectExists(GCSFile* gcs_file, const std::string& path,
724                          const std::string& bucket, const std::string& object,
725                          TF_Status* status) {
726   GcsFileStat stat;
727   StatForObject(gcs_file, path, bucket, object, &stat, status);
728   if (TF_GetCode(status) != TF_OK && TF_GetCode(status) != TF_NOT_FOUND)
729     return false;
730   if (TF_GetCode(status) == TF_NOT_FOUND) {
731     TF_SetStatus(status, TF_OK, "");
732     return false;
733   }
734   return !stat.base.is_directory;
735 }
736 
BucketExists(GCSFile * gcs_file,const std::string & bucket,TF_Status * status)737 static bool BucketExists(GCSFile* gcs_file, const std::string& bucket,
738                          TF_Status* status) {
739   auto metadata =
740       gcs_file->gcs_client.GetBucketMetadata(bucket, gcs::Fields(""));
741   TF_SetStatusFromGCSStatus(metadata.status(), status);
742   if (TF_GetCode(status) != TF_OK && TF_GetCode(status) != TF_NOT_FOUND)
743     return false;
744   if (TF_GetCode(status) == TF_NOT_FOUND) {
745     TF_SetStatus(status, TF_OK, "");
746     return false;
747   }
748   return true;
749 }
750 
GetChildrenBounded(GCSFile * gcs_file,std::string dir,uint64_t max_results,bool recursive,bool include_self_directory_marker,TF_Status * status)751 static std::vector<std::string> GetChildrenBounded(
752     GCSFile* gcs_file, std::string dir, uint64_t max_results, bool recursive,
753     bool include_self_directory_marker, TF_Status* status) {
754   std::string bucket, prefix;
755   MaybeAppendSlash(&dir);
756   ParseGCSPath(dir, true, &bucket, &prefix, status);
757 
758   std::vector<std::string> result;
759   uint64_t count = 0;
760   std::string delimiter = recursive ? "" : "/";
761 
762   for (auto&& item : gcs_file->gcs_client.ListObjectsAndPrefixes(
763            bucket, gcs::Prefix(prefix), gcs::Delimiter(delimiter),
764            gcs::Fields("items(name),prefixes"))) {
765     if (count == max_results) {
766       TF_SetStatus(status, TF_OK, "");
767       return result;
768     }
769     if (!item) {
770       TF_SetStatusFromGCSStatus(item.status(), status);
771       return result;
772     }
773     auto value = *std::move(item);
774     std::string children = absl::holds_alternative<std::string>(value)
775                                ? absl::get<std::string>(value)
776                                : absl::get<gcs::ObjectMetadata>(value).name();
777     auto pos = children.find(prefix);
778     if (pos != 0) {
779       TF_SetStatus(status, TF_INTERNAL,
780                    absl::StrCat("Unexpected response: the returned file name ",
781                                 children, " doesn't match the prefix ", prefix)
782                        .c_str());
783       return result;
784     }
785     children.erase(0, prefix.length());
786     if (!children.empty() || include_self_directory_marker) {
787       result.emplace_back(children);
788     }
789     ++count;
790   }
791 
792   return result;
793 }
794 
FolderExists(GCSFile * gcs_file,std::string dir,TF_Status * status)795 static bool FolderExists(GCSFile* gcs_file, std::string dir,
796                          TF_Status* status) {
797   ExpiringLRUCache<GcsFileStat>::ComputeFunc compute_func =
798       [gcs_file](const std::string& dir, GcsFileStat* stat, TF_Status* status) {
799         auto children =
800             GetChildrenBounded(gcs_file, dir, 1, true, true, status);
801         if (TF_GetCode(status) != TF_OK) return;
802         if (!children.empty()) {
803           stat->base = {0, 0, true};
804           return TF_SetStatus(status, TF_OK, "");
805         } else {
806           return TF_SetStatus(status, TF_INVALID_ARGUMENT, "Not a directory!");
807         }
808       };
809   GcsFileStat stat;
810   MaybeAppendSlash(&dir);
811   gcs_file->stat_cache->LookupOrCompute(dir, &stat, compute_func, status);
812   if (TF_GetCode(status) != TF_OK && TF_GetCode(status) != TF_INVALID_ARGUMENT)
813     return false;
814   if (TF_GetCode(status) == TF_INVALID_ARGUMENT) {
815     TF_SetStatus(status, TF_OK, "");
816     return false;
817   }
818   return true;
819 }
820 
ClearFileCaches(GCSFile * gcs_file,const std::string & path)821 static void ClearFileCaches(GCSFile* gcs_file, const std::string& path) {
822   absl::ReaderMutexLock l(&gcs_file->block_cache_lock);
823   gcs_file->file_block_cache->RemoveFile(path);
824   gcs_file->stat_cache->Delete(path);
825 }
826 
PathExists(const TF_Filesystem * filesystem,const char * path,TF_Status * status)827 void PathExists(const TF_Filesystem* filesystem, const char* path,
828                 TF_Status* status) {
829   std::string bucket, object;
830   ParseGCSPath(path, true, &bucket, &object, status);
831   if (TF_GetCode(status) != TF_OK) return;
832 
833   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
834   if (object.empty()) {
835     bool result = BucketExists(gcs_file, bucket, status);
836     if (result) return TF_SetStatus(status, TF_OK, "");
837   }
838 
839   GcsFileStat stat;
840   StatForObject(gcs_file, path, bucket, object, &stat, status);
841   if (TF_GetCode(status) != TF_NOT_FOUND) return;
842 
843   bool result = FolderExists(gcs_file, path, status);
844   if (TF_GetCode(status) != TF_OK || (TF_GetCode(status) == TF_OK && result))
845     return;
846   return TF_SetStatus(
847       status, TF_NOT_FOUND,
848       absl::StrCat("The path ", path, " does not exist.").c_str());
849 }
850 
CreateDir(const TF_Filesystem * filesystem,const char * path,TF_Status * status)851 void CreateDir(const TF_Filesystem* filesystem, const char* path,
852                TF_Status* status) {
853   std::string dir = path;
854   MaybeAppendSlash(&dir);
855   TF_VLog(3,
856           "CreateDir: creating directory with path: %s and "
857           "path_with_slash: %s",
858           path, dir.c_str());
859   std::string bucket, object;
860   ParseGCSPath(dir, true, &bucket, &object, status);
861   if (TF_GetCode(status) != TF_OK) return;
862   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
863   if (object.empty()) {
864     bool is_directory = BucketExists(gcs_file, bucket, status);
865     if (TF_GetCode(status) != TF_OK) return;
866     if (!is_directory)
867       TF_SetStatus(status, TF_NOT_FOUND,
868                    absl::StrCat("The specified bucket ", dir, " was not found.")
869                        .c_str());
870     return;
871   }
872 
873   PathExists(filesystem, dir.c_str(), status);
874   if (TF_GetCode(status) == TF_OK) {
875     // Use the original name for a correct error here.
876     TF_VLog(3, "CreateDir: directory already exists, not uploading %s", path);
877     return TF_SetStatus(status, TF_ALREADY_EXISTS, path);
878   }
879 
880   auto metadata = gcs_file->gcs_client.InsertObject(
881       bucket, object, "",
882       // Adding this parameter means HTTP_CODE_PRECONDITION_FAILED
883       // will be returned if the object already exists, so avoid reuploading.
884       gcs::IfGenerationMatch(0), gcs::Fields(""));
885   TF_SetStatusFromGCSStatus(metadata.status(), status);
886   if (TF_GetCode(status) == TF_FAILED_PRECONDITION)
887     TF_SetStatus(status, TF_ALREADY_EXISTS, path);
888 }
889 
890 // TODO(vnvo2409): `RecursivelyCreateDir` should use `CreateDir` instead of the
891 // default implementation. Because we could create an empty object whose
892 // key is equal to the `path` and Google Cloud Console will automatically
893 // display it as a directory tree.
894 
DeleteFile(const TF_Filesystem * filesystem,const char * path,TF_Status * status)895 void DeleteFile(const TF_Filesystem* filesystem, const char* path,
896                 TF_Status* status) {
897   std::string bucket, object;
898   ParseGCSPath(path, false, &bucket, &object, status);
899   if (TF_GetCode(status) != TF_OK) return;
900   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
901   auto gcs_status = gcs_file->gcs_client.DeleteObject(bucket, object);
902   TF_SetStatusFromGCSStatus(gcs_status, status);
903   if (TF_GetCode(status) == TF_OK) ClearFileCaches(gcs_file, path);
904 }
905 
906 // Checks that the directory is empty (i.e no objects with this prefix exist).
907 // Deletes the GCS directory marker if it exists.
DeleteDir(const TF_Filesystem * filesystem,const char * path,TF_Status * status)908 void DeleteDir(const TF_Filesystem* filesystem, const char* path,
909                TF_Status* status) {
910   // A directory is considered empty either if there are no matching objects
911   // with the corresponding name prefix or if there is exactly one matching
912   // object and it is the directory marker. Therefore we need to retrieve
913   // at most two children for the prefix to detect if a directory is empty.
914   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
915   auto childrens = GetChildrenBounded(gcs_file, path, 2, true, true, status);
916   if (TF_GetCode(status) != TF_OK) return;
917   if (childrens.size() > 1 || (childrens.size() == 1 && !childrens[0].empty()))
918     return TF_SetStatus(status, TF_FAILED_PRECONDITION,
919                         "Cannot delete a non-empty directory.");
920   if (childrens.size() == 1 && childrens[0].empty()) {
921     // This is the directory marker object. Delete it.
922     std::string dir = path;
923     MaybeAppendSlash(&dir);
924     DeleteFile(filesystem, dir.c_str(), status);
925     return;
926   }
927   TF_SetStatus(status, TF_OK, "");
928 }
929 
CopyFile(const TF_Filesystem * filesystem,const char * src,const char * dst,TF_Status * status)930 void CopyFile(const TF_Filesystem* filesystem, const char* src, const char* dst,
931               TF_Status* status) {
932   std::string bucket_src, object_src;
933   ParseGCSPath(src, false, &bucket_src, &object_src, status);
934   if (TF_GetCode(status) != TF_OK) return;
935 
936   std::string bucket_dst, object_dst;
937   ParseGCSPath(dst, false, &bucket_dst, &object_dst, status);
938   if (TF_GetCode(status) != TF_OK) return;
939 
940   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
941   auto metadata = gcs_file->gcs_client.RewriteObjectBlocking(
942       bucket_src, object_src, bucket_dst, object_dst,
943       gcs::Fields("done,rewriteToken"));
944   TF_SetStatusFromGCSStatus(metadata.status(), status);
945 }
946 
IsDirectory(const TF_Filesystem * filesystem,const char * path,TF_Status * status)947 bool IsDirectory(const TF_Filesystem* filesystem, const char* path,
948                  TF_Status* status) {
949   std::string bucket, object;
950   ParseGCSPath(path, true, &bucket, &object, status);
951   if (TF_GetCode(status) != TF_OK) return false;
952 
953   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
954   if (object.empty()) {
955     bool result = BucketExists(gcs_file, bucket, status);
956     if (TF_GetCode(status) != TF_OK) return false;
957     if (!result)
958       TF_SetStatus(
959           status, TF_NOT_FOUND,
960           absl::StrCat("The specified bucket gs://", bucket, " was not found.")
961               .c_str());
962     return result;
963   }
964 
965   bool is_folder = FolderExists(gcs_file, path, status);
966   if (TF_GetCode(status) != TF_OK) return false;
967   if (is_folder) return true;
968 
969   bool is_object = ObjectExists(gcs_file, path, bucket, object, status);
970   if (TF_GetCode(status) != TF_OK) return false;
971   if (is_object) {
972     TF_SetStatus(
973         status, TF_FAILED_PRECONDITION,
974         absl::StrCat("The specified path ", path, " is not a directory.")
975             .c_str());
976     return false;
977   }
978   TF_SetStatus(status, TF_NOT_FOUND,
979                absl::StrCat("The path ", path, " does not exist.").c_str());
980   return false;
981 }
982 
RenameObject(const TF_Filesystem * filesystem,const std::string & src,const std::string & dst,TF_Status * status)983 static void RenameObject(const TF_Filesystem* filesystem,
984                          const std::string& src, const std::string& dst,
985                          TF_Status* status) {
986   TF_VLog(3, "RenameObject: started %s to %s", src.c_str(), dst.c_str());
987   std::string bucket_src, object_src;
988   ParseGCSPath(src, false, &bucket_src, &object_src, status);
989   if (TF_GetCode(status) != TF_OK) return;
990 
991   std::string bucket_dst, object_dst;
992   ParseGCSPath(dst, false, &bucket_dst, &object_dst, status);
993   if (TF_GetCode(status) != TF_OK) return;
994 
995   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
996   auto metadata = gcs_file->gcs_client.RewriteObjectBlocking(
997       bucket_src, object_src, bucket_dst, object_dst,
998       gcs::Fields("done,rewriteToken"));
999   TF_SetStatusFromGCSStatus(metadata.status(), status);
1000   if (TF_GetCode(status) != TF_OK) return;
1001   TF_VLog(3, "RenameObject: finished %s to %s", src.c_str(), dst.c_str());
1002 
1003   ClearFileCaches(gcs_file, dst);
1004   DeleteFile(filesystem, src.c_str(), status);
1005 }
1006 
RenameFile(const TF_Filesystem * filesystem,const char * src,const char * dst,TF_Status * status)1007 void RenameFile(const TF_Filesystem* filesystem, const char* src,
1008                 const char* dst, TF_Status* status) {
1009   if (!IsDirectory(filesystem, src, status)) {
1010     if (TF_GetCode(status) == TF_FAILED_PRECONDITION) {
1011       TF_SetStatus(status, TF_OK, "");
1012       RenameObject(filesystem, src, dst, status);
1013     }
1014     return;
1015   }
1016 
1017   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
1018   std::vector<std::string> childrens =
1019       GetChildrenBounded(gcs_file, src, UINT64_MAX, true, true, status);
1020   if (TF_GetCode(status) != TF_OK) return;
1021 
1022   std::string src_dir = src;
1023   std::string dst_dir = dst;
1024   MaybeAppendSlash(&src_dir);
1025   MaybeAppendSlash(&dst_dir);
1026   for (const std::string& children : childrens) {
1027     RenameObject(filesystem, src_dir + children, dst_dir + children, status);
1028     if (TF_GetCode(status) != TF_OK) return;
1029   }
1030   TF_SetStatus(status, TF_OK, "");
1031 }
1032 
DeleteRecursively(const TF_Filesystem * filesystem,const char * path,uint64_t * undeleted_files,uint64_t * undeleted_dirs,TF_Status * status)1033 void DeleteRecursively(const TF_Filesystem* filesystem, const char* path,
1034                        uint64_t* undeleted_files, uint64_t* undeleted_dirs,
1035                        TF_Status* status) {
1036   if (!undeleted_files || !undeleted_dirs)
1037     return TF_SetStatus(
1038         status, TF_INTERNAL,
1039         "'undeleted_files' and 'undeleted_dirs' cannot be nullptr.");
1040   *undeleted_files = 0;
1041   *undeleted_dirs = 0;
1042   if (!IsDirectory(filesystem, path, status)) {
1043     *undeleted_dirs = 1;
1044     return;
1045   }
1046   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
1047   std::vector<std::string> childrens =
1048       GetChildrenBounded(gcs_file, path, UINT64_MAX, true, true, status);
1049   if (TF_GetCode(status) != TF_OK) return;
1050 
1051   std::string dir = path;
1052   MaybeAppendSlash(&dir);
1053   for (const std::string& children : childrens) {
1054     const std::string& full_path = dir + children;
1055     DeleteFile(filesystem, full_path.c_str(), status);
1056     if (TF_GetCode(status) != TF_OK) {
1057       if (IsDirectory(filesystem, full_path.c_str(), status))
1058         // The object is a directory marker.
1059         (*undeleted_dirs)++;
1060       else
1061         (*undeleted_files)++;
1062     }
1063   }
1064 }
1065 
GetChildren(const TF_Filesystem * filesystem,const char * path,char *** entries,TF_Status * status)1066 int GetChildren(const TF_Filesystem* filesystem, const char* path,
1067                 char*** entries, TF_Status* status) {
1068   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
1069   std::vector<std::string> childrens =
1070       GetChildrenBounded(gcs_file, path, UINT64_MAX, false, false, status);
1071   if (TF_GetCode(status) != TF_OK) return -1;
1072 
1073   int num_entries = childrens.size();
1074   *entries = static_cast<char**>(
1075       plugin_memory_allocate(num_entries * sizeof((*entries)[0])));
1076   for (int i = 0; i < num_entries; i++)
1077     (*entries)[i] = strdup(childrens[i].c_str());
1078   TF_SetStatus(status, TF_OK, "");
1079   return num_entries;
1080 }
1081 
Stat(const TF_Filesystem * filesystem,const char * path,TF_FileStatistics * stats,TF_Status * status)1082 void Stat(const TF_Filesystem* filesystem, const char* path,
1083           TF_FileStatistics* stats, TF_Status* status) {
1084   std::string bucket, object;
1085   ParseGCSPath(path, true, &bucket, &object, status);
1086   if (TF_GetCode(status) != TF_OK) return;
1087 
1088   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
1089   if (object.empty()) {
1090     auto bucket_metadata =
1091         gcs_file->gcs_client.GetBucketMetadata(bucket, gcs::Fields(""));
1092     TF_SetStatusFromGCSStatus(bucket_metadata.status(), status);
1093     if (TF_GetCode(status) == TF_OK) {
1094       stats->is_directory = true;
1095       stats->length = 0;
1096       stats->mtime_nsec = 0;
1097     }
1098     return;
1099   }
1100   if (IsDirectory(filesystem, path, status)) {
1101     stats->is_directory = true;
1102     stats->length = 0;
1103     stats->mtime_nsec = 0;
1104     return TF_SetStatus(status, TF_OK, "");
1105   }
1106   if (TF_GetCode(status) == TF_FAILED_PRECONDITION) {
1107     auto metadata = gcs_file->gcs_client.GetObjectMetadata(
1108         bucket, object, gcs::Fields("size,timeStorageClassUpdated"));
1109     if (metadata) {
1110       stats->is_directory = false;
1111       stats->length = metadata.value().size();
1112       stats->mtime_nsec = metadata.value()
1113                               .time_storage_class_updated()
1114                               .time_since_epoch()
1115                               .count();
1116     }
1117     TF_SetStatusFromGCSStatus(metadata.status(), status);
1118   }
1119 }
1120 
GetFileSize(const TF_Filesystem * filesystem,const char * path,TF_Status * status)1121 int64_t GetFileSize(const TF_Filesystem* filesystem, const char* path,
1122                     TF_Status* status) {
1123   // Only validate the name.
1124   std::string bucket, object;
1125   ParseGCSPath(path, false, &bucket, &object, status);
1126   if (TF_GetCode(status) != TF_OK) return -1;
1127 
1128   TF_FileStatistics stat;
1129   Stat(filesystem, path, &stat, status);
1130   return stat.length;
1131 }
1132 
TranslateName(const TF_Filesystem * filesystem,const char * uri)1133 static char* TranslateName(const TF_Filesystem* filesystem, const char* uri) {
1134   return strdup(uri);
1135 }
1136 
FlushCaches(const TF_Filesystem * filesystem)1137 static void FlushCaches(const TF_Filesystem* filesystem) {
1138   auto gcs_file = static_cast<GCSFile*>(filesystem->plugin_filesystem);
1139   absl::ReaderMutexLock l(&gcs_file->block_cache_lock);
1140   gcs_file->file_block_cache->Flush();
1141   gcs_file->stat_cache->Clear();
1142 }
1143 
1144 }  // namespace tf_gcs_filesystem
1145 
ProvideFilesystemSupportFor(TF_FilesystemPluginOps * ops,const char * uri)1146 static void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops,
1147                                         const char* uri) {
1148   TF_SetFilesystemVersionMetadata(ops);
1149   ops->scheme = strdup(uri);
1150 
1151   ops->random_access_file_ops = static_cast<TF_RandomAccessFileOps*>(
1152       plugin_memory_allocate(TF_RANDOM_ACCESS_FILE_OPS_SIZE));
1153   ops->random_access_file_ops->cleanup = tf_random_access_file::Cleanup;
1154   ops->random_access_file_ops->read = tf_random_access_file::Read;
1155 
1156   ops->writable_file_ops = static_cast<TF_WritableFileOps*>(
1157       plugin_memory_allocate(TF_WRITABLE_FILE_OPS_SIZE));
1158   ops->writable_file_ops->cleanup = tf_writable_file::Cleanup;
1159 
1160   ops->read_only_memory_region_ops = static_cast<TF_ReadOnlyMemoryRegionOps*>(
1161       plugin_memory_allocate(TF_READ_ONLY_MEMORY_REGION_OPS_SIZE));
1162   ops->read_only_memory_region_ops->cleanup =
1163       tf_read_only_memory_region::Cleanup;
1164   ops->read_only_memory_region_ops->data = tf_read_only_memory_region::Data;
1165   ops->read_only_memory_region_ops->length = tf_read_only_memory_region::Length;
1166 
1167   ops->filesystem_ops = static_cast<TF_FilesystemOps*>(
1168       plugin_memory_allocate(TF_FILESYSTEM_OPS_SIZE));
1169   ops->filesystem_ops->init = tf_gcs_filesystem::Init;
1170   ops->filesystem_ops->cleanup = tf_gcs_filesystem::Cleanup;
1171   ops->filesystem_ops->new_random_access_file =
1172       tf_gcs_filesystem::NewRandomAccessFile;
1173   ops->filesystem_ops->new_writable_file = tf_gcs_filesystem::NewWritableFile;
1174   ops->filesystem_ops->new_appendable_file =
1175       tf_gcs_filesystem::NewAppendableFile;
1176   ops->filesystem_ops->new_read_only_memory_region_from_file =
1177       tf_gcs_filesystem::NewReadOnlyMemoryRegionFromFile;
1178   ops->filesystem_ops->create_dir = tf_gcs_filesystem::CreateDir;
1179   ops->filesystem_ops->delete_file = tf_gcs_filesystem::DeleteFile;
1180   ops->filesystem_ops->delete_dir = tf_gcs_filesystem::DeleteDir;
1181   ops->filesystem_ops->delete_recursively =
1182       tf_gcs_filesystem::DeleteRecursively;
1183   ops->filesystem_ops->copy_file = tf_gcs_filesystem::CopyFile;
1184   ops->filesystem_ops->path_exists = tf_gcs_filesystem::PathExists;
1185   ops->filesystem_ops->is_directory = tf_gcs_filesystem::IsDirectory;
1186   ops->filesystem_ops->stat = tf_gcs_filesystem::Stat;
1187   ops->filesystem_ops->get_children = tf_gcs_filesystem::GetChildren;
1188   ops->filesystem_ops->translate_name = tf_gcs_filesystem::TranslateName;
1189   ops->filesystem_ops->flush_caches = tf_gcs_filesystem::FlushCaches;
1190 }
1191 
TF_InitPlugin(TF_FilesystemPluginInfo * info)1192 void TF_InitPlugin(TF_FilesystemPluginInfo* info) {
1193   info->plugin_memory_allocate = plugin_memory_allocate;
1194   info->plugin_memory_free = plugin_memory_free;
1195   info->num_schemes = 1;
1196   info->ops = static_cast<TF_FilesystemPluginOps*>(
1197       plugin_memory_allocate(info->num_schemes * sizeof(info->ops[0])));
1198   ProvideFilesystemSupportFor(&info->ops[0], "gs");
1199 }
1200