xref: /aosp_15_r20/external/tensorflow/tensorflow/core/platform/retrying_file_system.h (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
18 
19 #include <functional>
20 #include <string>
21 #include <vector>
22 
23 #include "tensorflow/core/lib/random/random.h"
24 #include "tensorflow/core/platform/env.h"
25 #include "tensorflow/core/platform/errors.h"
26 #include "tensorflow/core/platform/file_system.h"
27 #include "tensorflow/core/platform/retrying_utils.h"
28 #include "tensorflow/core/platform/status.h"
29 
30 namespace tensorflow {
31 
32 /// A wrapper to add retry logic to another file system.
33 template <typename Underlying>
34 class RetryingFileSystem : public FileSystem {
35  public:
RetryingFileSystem(std::unique_ptr<Underlying> base_file_system,const RetryConfig & retry_config)36   RetryingFileSystem(std::unique_ptr<Underlying> base_file_system,
37                      const RetryConfig& retry_config)
38       : base_file_system_(std::move(base_file_system)),
39         retry_config_(retry_config) {}
40 
41   TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
42 
43   Status NewRandomAccessFile(
44       const string& filename, TransactionToken* token,
45       std::unique_ptr<RandomAccessFile>* result) override;
46 
47   Status NewWritableFile(const string& filename, TransactionToken* token,
48                          std::unique_ptr<WritableFile>* result) override;
49 
50   Status NewAppendableFile(const string& filename, TransactionToken* token,
51                            std::unique_ptr<WritableFile>* result) override;
52 
53   Status NewReadOnlyMemoryRegionFromFile(
54       const string& filename, TransactionToken* token,
55       std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
56 
FileExists(const string & fname,TransactionToken * token)57   Status FileExists(const string& fname, TransactionToken* token) override {
58     return RetryingUtils::CallWithRetries(
59         [this, &fname, token]() {
60           return base_file_system_->FileExists(fname, token);
61         },
62         retry_config_);
63   }
64 
GetChildren(const string & dir,TransactionToken * token,std::vector<string> * result)65   Status GetChildren(const string& dir, TransactionToken* token,
66                      std::vector<string>* result) override {
67     return RetryingUtils::CallWithRetries(
68         [this, &dir, result, token]() {
69           return base_file_system_->GetChildren(dir, token, result);
70         },
71         retry_config_);
72   }
73 
GetMatchingPaths(const string & pattern,TransactionToken * token,std::vector<string> * result)74   Status GetMatchingPaths(const string& pattern, TransactionToken* token,
75                           std::vector<string>* result) override {
76     return RetryingUtils::CallWithRetries(
77         [this, &pattern, result, token]() {
78           return base_file_system_->GetMatchingPaths(pattern, token, result);
79         },
80         retry_config_);
81   }
82 
Stat(const string & fname,TransactionToken * token,FileStatistics * stat)83   Status Stat(const string& fname, TransactionToken* token,
84               FileStatistics* stat) override {
85     return RetryingUtils::CallWithRetries(
86         [this, &fname, stat, token]() {
87           return base_file_system_->Stat(fname, token, stat);
88         },
89         retry_config_);
90   }
91 
DeleteFile(const string & fname,TransactionToken * token)92   Status DeleteFile(const string& fname, TransactionToken* token) override {
93     return RetryingUtils::DeleteWithRetries(
94         [this, &fname, token]() {
95           return base_file_system_->DeleteFile(fname, token);
96         },
97         retry_config_);
98   }
99 
CreateDir(const string & dirname,TransactionToken * token)100   Status CreateDir(const string& dirname, TransactionToken* token) override {
101     return RetryingUtils::CallWithRetries(
102         [this, &dirname, token]() {
103           return base_file_system_->CreateDir(dirname, token);
104         },
105         retry_config_);
106   }
107 
DeleteDir(const string & dirname,TransactionToken * token)108   Status DeleteDir(const string& dirname, TransactionToken* token) override {
109     return RetryingUtils::DeleteWithRetries(
110         [this, &dirname, token]() {
111           return base_file_system_->DeleteDir(dirname, token);
112         },
113         retry_config_);
114   }
115 
GetFileSize(const string & fname,TransactionToken * token,uint64 * file_size)116   Status GetFileSize(const string& fname, TransactionToken* token,
117                      uint64* file_size) override {
118     return RetryingUtils::CallWithRetries(
119         [this, &fname, file_size, token]() {
120           return base_file_system_->GetFileSize(fname, token, file_size);
121         },
122         retry_config_);
123   }
124 
RenameFile(const string & src,const string & target,TransactionToken * token)125   Status RenameFile(const string& src, const string& target,
126                     TransactionToken* token) override {
127     return RetryingUtils::CallWithRetries(
128         [this, &src, &target, token]() {
129           return base_file_system_->RenameFile(src, target, token);
130         },
131         retry_config_);
132   }
133 
IsDirectory(const string & dirname,TransactionToken * token)134   Status IsDirectory(const string& dirname, TransactionToken* token) override {
135     return RetryingUtils::CallWithRetries(
136         [this, &dirname, token]() {
137           return base_file_system_->IsDirectory(dirname, token);
138         },
139         retry_config_);
140   }
141 
HasAtomicMove(const string & path,bool * has_atomic_move)142   Status HasAtomicMove(const string& path, bool* has_atomic_move) override {
143     // this method does not need to be retried
144     return base_file_system_->HasAtomicMove(path, has_atomic_move);
145   }
146 
CanCreateTempFile(const std::string & fname,bool * can_create_temp_file)147   Status CanCreateTempFile(const std::string& fname,
148                            bool* can_create_temp_file) override {
149     // this method does not need to be retried
150     return base_file_system_->CanCreateTempFile(fname, can_create_temp_file);
151   }
152 
DeleteRecursively(const string & dirname,TransactionToken * token,int64_t * undeleted_files,int64_t * undeleted_dirs)153   Status DeleteRecursively(const string& dirname, TransactionToken* token,
154                            int64_t* undeleted_files,
155                            int64_t* undeleted_dirs) override {
156     return RetryingUtils::DeleteWithRetries(
157         [this, &dirname, token, undeleted_files, undeleted_dirs]() {
158           return base_file_system_->DeleteRecursively(
159               dirname, token, undeleted_files, undeleted_dirs);
160         },
161         retry_config_);
162   }
163 
FlushCaches(TransactionToken * token)164   void FlushCaches(TransactionToken* token) override {
165     base_file_system_->FlushCaches(token);
166   }
167 
underlying()168   Underlying* underlying() const { return base_file_system_.get(); }
169 
170  private:
171   std::unique_ptr<Underlying> base_file_system_;
172   const RetryConfig retry_config_;
173 
174   TF_DISALLOW_COPY_AND_ASSIGN(RetryingFileSystem);
175 };
176 
177 namespace retrying_internals {
178 
179 class RetryingRandomAccessFile : public RandomAccessFile {
180  public:
RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,const RetryConfig & retry_config)181   RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,
182                            const RetryConfig& retry_config)
183       : base_file_(std::move(base_file)), retry_config_(retry_config) {}
184 
Name(StringPiece * result)185   Status Name(StringPiece* result) const override {
186     return base_file_->Name(result);
187   }
188 
Read(uint64 offset,size_t n,StringPiece * result,char * scratch)189   Status Read(uint64 offset, size_t n, StringPiece* result,
190               char* scratch) const override {
191     return RetryingUtils::CallWithRetries(
192         [this, offset, n, result, scratch]() {
193           return base_file_->Read(offset, n, result, scratch);
194         },
195         retry_config_);
196   }
197 
198  private:
199   std::unique_ptr<RandomAccessFile> base_file_;
200   const RetryConfig retry_config_;
201 };
202 
203 class RetryingWritableFile : public WritableFile {
204  public:
RetryingWritableFile(std::unique_ptr<WritableFile> base_file,const RetryConfig & retry_config)205   RetryingWritableFile(std::unique_ptr<WritableFile> base_file,
206                        const RetryConfig& retry_config)
207       : base_file_(std::move(base_file)), retry_config_(retry_config) {}
208 
~RetryingWritableFile()209   ~RetryingWritableFile() override {
210     // Makes sure the retrying version of Close() is called in the destructor.
211     Close().IgnoreError();
212   }
213 
Append(StringPiece data)214   Status Append(StringPiece data) override {
215     return RetryingUtils::CallWithRetries(
216         [this, &data]() { return base_file_->Append(data); }, retry_config_);
217   }
Close()218   Status Close() override {
219     return RetryingUtils::CallWithRetries(
220         [this]() { return base_file_->Close(); }, retry_config_);
221   }
Flush()222   Status Flush() override {
223     return RetryingUtils::CallWithRetries(
224         [this]() { return base_file_->Flush(); }, retry_config_);
225   }
Name(StringPiece * result)226   Status Name(StringPiece* result) const override {
227     return base_file_->Name(result);
228   }
Sync()229   Status Sync() override {
230     return RetryingUtils::CallWithRetries(
231         [this]() { return base_file_->Sync(); }, retry_config_);
232   }
Tell(int64_t * position)233   Status Tell(int64_t* position) override {
234     return RetryingUtils::CallWithRetries(
235         [this, &position]() { return base_file_->Tell(position); },
236         retry_config_);
237   }
238 
239  private:
240   std::unique_ptr<WritableFile> base_file_;
241   const RetryConfig retry_config_;
242 };
243 
244 }  // namespace retrying_internals
245 
246 template <typename Underlying>
NewRandomAccessFile(const string & filename,TransactionToken * token,std::unique_ptr<RandomAccessFile> * result)247 Status RetryingFileSystem<Underlying>::NewRandomAccessFile(
248     const string& filename, TransactionToken* token,
249     std::unique_ptr<RandomAccessFile>* result) {
250   std::unique_ptr<RandomAccessFile> base_file;
251   TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
252       [this, &filename, &base_file, token]() {
253         return base_file_system_->NewRandomAccessFile(filename, token,
254                                                       &base_file);
255       },
256       retry_config_));
257   result->reset(new retrying_internals::RetryingRandomAccessFile(
258       std::move(base_file), retry_config_));
259   return OkStatus();
260 }
261 
262 template <typename Underlying>
NewWritableFile(const string & filename,TransactionToken * token,std::unique_ptr<WritableFile> * result)263 Status RetryingFileSystem<Underlying>::NewWritableFile(
264     const string& filename, TransactionToken* token,
265     std::unique_ptr<WritableFile>* result) {
266   std::unique_ptr<WritableFile> base_file;
267   TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
268       [this, &filename, &base_file, token]() {
269         return base_file_system_->NewWritableFile(filename, token, &base_file);
270       },
271       retry_config_));
272   result->reset(new retrying_internals::RetryingWritableFile(
273       std::move(base_file), retry_config_));
274   return OkStatus();
275 }
276 
277 template <typename Underlying>
NewAppendableFile(const string & filename,TransactionToken * token,std::unique_ptr<WritableFile> * result)278 Status RetryingFileSystem<Underlying>::NewAppendableFile(
279     const string& filename, TransactionToken* token,
280     std::unique_ptr<WritableFile>* result) {
281   std::unique_ptr<WritableFile> base_file;
282   TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
283       [this, &filename, &base_file, token]() {
284         return base_file_system_->NewAppendableFile(filename, token,
285                                                     &base_file);
286       },
287       retry_config_));
288   result->reset(new retrying_internals::RetryingWritableFile(
289       std::move(base_file), retry_config_));
290   return OkStatus();
291 }
292 
293 template <typename Underlying>
NewReadOnlyMemoryRegionFromFile(const string & filename,TransactionToken * token,std::unique_ptr<ReadOnlyMemoryRegion> * result)294 Status RetryingFileSystem<Underlying>::NewReadOnlyMemoryRegionFromFile(
295     const string& filename, TransactionToken* token,
296     std::unique_ptr<ReadOnlyMemoryRegion>* result) {
297   return RetryingUtils::CallWithRetries(
298       [this, &filename, result, token]() {
299         return base_file_system_->NewReadOnlyMemoryRegionFromFile(
300             filename, token, result);
301       },
302       retry_config_);
303 }
304 
305 }  // namespace tensorflow
306 
307 #endif  // TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
308