1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
18
19 #include <functional>
20 #include <string>
21 #include <vector>
22
23 #include "tensorflow/core/lib/random/random.h"
24 #include "tensorflow/core/platform/env.h"
25 #include "tensorflow/core/platform/errors.h"
26 #include "tensorflow/core/platform/file_system.h"
27 #include "tensorflow/core/platform/retrying_utils.h"
28 #include "tensorflow/core/platform/status.h"
29
30 namespace tensorflow {
31
32 /// A wrapper to add retry logic to another file system.
33 template <typename Underlying>
34 class RetryingFileSystem : public FileSystem {
35 public:
RetryingFileSystem(std::unique_ptr<Underlying> base_file_system,const RetryConfig & retry_config)36 RetryingFileSystem(std::unique_ptr<Underlying> base_file_system,
37 const RetryConfig& retry_config)
38 : base_file_system_(std::move(base_file_system)),
39 retry_config_(retry_config) {}
40
41 TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
42
43 Status NewRandomAccessFile(
44 const string& filename, TransactionToken* token,
45 std::unique_ptr<RandomAccessFile>* result) override;
46
47 Status NewWritableFile(const string& filename, TransactionToken* token,
48 std::unique_ptr<WritableFile>* result) override;
49
50 Status NewAppendableFile(const string& filename, TransactionToken* token,
51 std::unique_ptr<WritableFile>* result) override;
52
53 Status NewReadOnlyMemoryRegionFromFile(
54 const string& filename, TransactionToken* token,
55 std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
56
FileExists(const string & fname,TransactionToken * token)57 Status FileExists(const string& fname, TransactionToken* token) override {
58 return RetryingUtils::CallWithRetries(
59 [this, &fname, token]() {
60 return base_file_system_->FileExists(fname, token);
61 },
62 retry_config_);
63 }
64
GetChildren(const string & dir,TransactionToken * token,std::vector<string> * result)65 Status GetChildren(const string& dir, TransactionToken* token,
66 std::vector<string>* result) override {
67 return RetryingUtils::CallWithRetries(
68 [this, &dir, result, token]() {
69 return base_file_system_->GetChildren(dir, token, result);
70 },
71 retry_config_);
72 }
73
GetMatchingPaths(const string & pattern,TransactionToken * token,std::vector<string> * result)74 Status GetMatchingPaths(const string& pattern, TransactionToken* token,
75 std::vector<string>* result) override {
76 return RetryingUtils::CallWithRetries(
77 [this, &pattern, result, token]() {
78 return base_file_system_->GetMatchingPaths(pattern, token, result);
79 },
80 retry_config_);
81 }
82
Stat(const string & fname,TransactionToken * token,FileStatistics * stat)83 Status Stat(const string& fname, TransactionToken* token,
84 FileStatistics* stat) override {
85 return RetryingUtils::CallWithRetries(
86 [this, &fname, stat, token]() {
87 return base_file_system_->Stat(fname, token, stat);
88 },
89 retry_config_);
90 }
91
DeleteFile(const string & fname,TransactionToken * token)92 Status DeleteFile(const string& fname, TransactionToken* token) override {
93 return RetryingUtils::DeleteWithRetries(
94 [this, &fname, token]() {
95 return base_file_system_->DeleteFile(fname, token);
96 },
97 retry_config_);
98 }
99
CreateDir(const string & dirname,TransactionToken * token)100 Status CreateDir(const string& dirname, TransactionToken* token) override {
101 return RetryingUtils::CallWithRetries(
102 [this, &dirname, token]() {
103 return base_file_system_->CreateDir(dirname, token);
104 },
105 retry_config_);
106 }
107
DeleteDir(const string & dirname,TransactionToken * token)108 Status DeleteDir(const string& dirname, TransactionToken* token) override {
109 return RetryingUtils::DeleteWithRetries(
110 [this, &dirname, token]() {
111 return base_file_system_->DeleteDir(dirname, token);
112 },
113 retry_config_);
114 }
115
GetFileSize(const string & fname,TransactionToken * token,uint64 * file_size)116 Status GetFileSize(const string& fname, TransactionToken* token,
117 uint64* file_size) override {
118 return RetryingUtils::CallWithRetries(
119 [this, &fname, file_size, token]() {
120 return base_file_system_->GetFileSize(fname, token, file_size);
121 },
122 retry_config_);
123 }
124
RenameFile(const string & src,const string & target,TransactionToken * token)125 Status RenameFile(const string& src, const string& target,
126 TransactionToken* token) override {
127 return RetryingUtils::CallWithRetries(
128 [this, &src, &target, token]() {
129 return base_file_system_->RenameFile(src, target, token);
130 },
131 retry_config_);
132 }
133
IsDirectory(const string & dirname,TransactionToken * token)134 Status IsDirectory(const string& dirname, TransactionToken* token) override {
135 return RetryingUtils::CallWithRetries(
136 [this, &dirname, token]() {
137 return base_file_system_->IsDirectory(dirname, token);
138 },
139 retry_config_);
140 }
141
HasAtomicMove(const string & path,bool * has_atomic_move)142 Status HasAtomicMove(const string& path, bool* has_atomic_move) override {
143 // this method does not need to be retried
144 return base_file_system_->HasAtomicMove(path, has_atomic_move);
145 }
146
CanCreateTempFile(const std::string & fname,bool * can_create_temp_file)147 Status CanCreateTempFile(const std::string& fname,
148 bool* can_create_temp_file) override {
149 // this method does not need to be retried
150 return base_file_system_->CanCreateTempFile(fname, can_create_temp_file);
151 }
152
DeleteRecursively(const string & dirname,TransactionToken * token,int64_t * undeleted_files,int64_t * undeleted_dirs)153 Status DeleteRecursively(const string& dirname, TransactionToken* token,
154 int64_t* undeleted_files,
155 int64_t* undeleted_dirs) override {
156 return RetryingUtils::DeleteWithRetries(
157 [this, &dirname, token, undeleted_files, undeleted_dirs]() {
158 return base_file_system_->DeleteRecursively(
159 dirname, token, undeleted_files, undeleted_dirs);
160 },
161 retry_config_);
162 }
163
FlushCaches(TransactionToken * token)164 void FlushCaches(TransactionToken* token) override {
165 base_file_system_->FlushCaches(token);
166 }
167
underlying()168 Underlying* underlying() const { return base_file_system_.get(); }
169
170 private:
171 std::unique_ptr<Underlying> base_file_system_;
172 const RetryConfig retry_config_;
173
174 TF_DISALLOW_COPY_AND_ASSIGN(RetryingFileSystem);
175 };
176
177 namespace retrying_internals {
178
179 class RetryingRandomAccessFile : public RandomAccessFile {
180 public:
RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,const RetryConfig & retry_config)181 RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,
182 const RetryConfig& retry_config)
183 : base_file_(std::move(base_file)), retry_config_(retry_config) {}
184
Name(StringPiece * result)185 Status Name(StringPiece* result) const override {
186 return base_file_->Name(result);
187 }
188
Read(uint64 offset,size_t n,StringPiece * result,char * scratch)189 Status Read(uint64 offset, size_t n, StringPiece* result,
190 char* scratch) const override {
191 return RetryingUtils::CallWithRetries(
192 [this, offset, n, result, scratch]() {
193 return base_file_->Read(offset, n, result, scratch);
194 },
195 retry_config_);
196 }
197
198 private:
199 std::unique_ptr<RandomAccessFile> base_file_;
200 const RetryConfig retry_config_;
201 };
202
203 class RetryingWritableFile : public WritableFile {
204 public:
RetryingWritableFile(std::unique_ptr<WritableFile> base_file,const RetryConfig & retry_config)205 RetryingWritableFile(std::unique_ptr<WritableFile> base_file,
206 const RetryConfig& retry_config)
207 : base_file_(std::move(base_file)), retry_config_(retry_config) {}
208
~RetryingWritableFile()209 ~RetryingWritableFile() override {
210 // Makes sure the retrying version of Close() is called in the destructor.
211 Close().IgnoreError();
212 }
213
Append(StringPiece data)214 Status Append(StringPiece data) override {
215 return RetryingUtils::CallWithRetries(
216 [this, &data]() { return base_file_->Append(data); }, retry_config_);
217 }
Close()218 Status Close() override {
219 return RetryingUtils::CallWithRetries(
220 [this]() { return base_file_->Close(); }, retry_config_);
221 }
Flush()222 Status Flush() override {
223 return RetryingUtils::CallWithRetries(
224 [this]() { return base_file_->Flush(); }, retry_config_);
225 }
Name(StringPiece * result)226 Status Name(StringPiece* result) const override {
227 return base_file_->Name(result);
228 }
Sync()229 Status Sync() override {
230 return RetryingUtils::CallWithRetries(
231 [this]() { return base_file_->Sync(); }, retry_config_);
232 }
Tell(int64_t * position)233 Status Tell(int64_t* position) override {
234 return RetryingUtils::CallWithRetries(
235 [this, &position]() { return base_file_->Tell(position); },
236 retry_config_);
237 }
238
239 private:
240 std::unique_ptr<WritableFile> base_file_;
241 const RetryConfig retry_config_;
242 };
243
244 } // namespace retrying_internals
245
246 template <typename Underlying>
NewRandomAccessFile(const string & filename,TransactionToken * token,std::unique_ptr<RandomAccessFile> * result)247 Status RetryingFileSystem<Underlying>::NewRandomAccessFile(
248 const string& filename, TransactionToken* token,
249 std::unique_ptr<RandomAccessFile>* result) {
250 std::unique_ptr<RandomAccessFile> base_file;
251 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
252 [this, &filename, &base_file, token]() {
253 return base_file_system_->NewRandomAccessFile(filename, token,
254 &base_file);
255 },
256 retry_config_));
257 result->reset(new retrying_internals::RetryingRandomAccessFile(
258 std::move(base_file), retry_config_));
259 return OkStatus();
260 }
261
262 template <typename Underlying>
NewWritableFile(const string & filename,TransactionToken * token,std::unique_ptr<WritableFile> * result)263 Status RetryingFileSystem<Underlying>::NewWritableFile(
264 const string& filename, TransactionToken* token,
265 std::unique_ptr<WritableFile>* result) {
266 std::unique_ptr<WritableFile> base_file;
267 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
268 [this, &filename, &base_file, token]() {
269 return base_file_system_->NewWritableFile(filename, token, &base_file);
270 },
271 retry_config_));
272 result->reset(new retrying_internals::RetryingWritableFile(
273 std::move(base_file), retry_config_));
274 return OkStatus();
275 }
276
277 template <typename Underlying>
NewAppendableFile(const string & filename,TransactionToken * token,std::unique_ptr<WritableFile> * result)278 Status RetryingFileSystem<Underlying>::NewAppendableFile(
279 const string& filename, TransactionToken* token,
280 std::unique_ptr<WritableFile>* result) {
281 std::unique_ptr<WritableFile> base_file;
282 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
283 [this, &filename, &base_file, token]() {
284 return base_file_system_->NewAppendableFile(filename, token,
285 &base_file);
286 },
287 retry_config_));
288 result->reset(new retrying_internals::RetryingWritableFile(
289 std::move(base_file), retry_config_));
290 return OkStatus();
291 }
292
293 template <typename Underlying>
NewReadOnlyMemoryRegionFromFile(const string & filename,TransactionToken * token,std::unique_ptr<ReadOnlyMemoryRegion> * result)294 Status RetryingFileSystem<Underlying>::NewReadOnlyMemoryRegionFromFile(
295 const string& filename, TransactionToken* token,
296 std::unique_ptr<ReadOnlyMemoryRegion>* result) {
297 return RetryingUtils::CallWithRetries(
298 [this, &filename, result, token]() {
299 return base_file_system_->NewReadOnlyMemoryRegionFromFile(
300 filename, token, result);
301 },
302 retry_config_);
303 }
304
305 } // namespace tensorflow
306
307 #endif // TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
308