xref: /aosp_15_r20/external/federated-compute/fcp/tensorflow/file_descriptor_filesystem.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2023 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "fcp/tensorflow/file_descriptor_filesystem.h"
18 
19 #include <errno.h>
20 #include <sys/stat.h>
21 #include <unistd.h>
22 
23 #include <algorithm>
24 #include <memory>
25 #include <vector>
26 
27 #include "absl/memory/memory.h"
28 #include "absl/strings/numbers.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/string_view.h"
31 #include "absl/strings/strip.h"
32 #include "fcp/base/monitoring.h"
33 #include "tensorflow/core/platform/env.h"
34 
35 namespace tensorflow {
36 
37 namespace fcp {
38 
39 using ::tensorflow::Status;
40 
41 static constexpr char kFdFilesystemPrefix[] = "fd:///";
42 static constexpr size_t kMaxWriteChunkSize = 64u * 1024;  // 64KB
43 
44 namespace {
45 // Copied from base implementation in
46 // //external/tensorflow/tensorflow/tsl/platform/default/posix_file_system.cc
ReadBytesFromFd(int fd,uint64_t offset,size_t n,absl::string_view * result,char * scratch)47 absl::Status ReadBytesFromFd(int fd, uint64_t offset, size_t n,
48                              absl::string_view* result, char* scratch) {
49   absl::Status s;
50   char* dst = scratch;
51   while (n > 0 && s.ok()) {
52     ssize_t r = pread(fd, dst, n, static_cast<off_t>(offset));
53     if (r > 0) {
54       dst += r;
55       n -= r;
56       offset += r;
57     } else if (r == 0) {
58       s = absl::OutOfRangeError(absl::StrCat(
59           "Read fewer bytes than requested. Total read bytes ", offset));
60     } else if (errno == EINTR || errno == EAGAIN) {
61       // Retry
62     } else {
63       s = absl::UnknownError(absl::StrCat("Failed to read: errno ", errno));
64     }
65   }
66   *result = absl::string_view(scratch, dst - scratch);
67   return s;
68 }
69 
70 class FdRandomAccessFile : public RandomAccessFile {
71  public:
FdRandomAccessFile(int fd)72   explicit FdRandomAccessFile(int fd) : fd_(fd) {}
73 
~FdRandomAccessFile()74   ~FdRandomAccessFile() override { close(fd_); }
75 
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const76   Status Read(uint64 offset, size_t n, StringPiece* result,
77               char* scratch) const override {
78     absl::string_view sv;
79     absl::Status s = ReadBytesFromFd(fd_, offset, n, &sv, scratch);
80     if (s.ok()) {
81       *result = StringPiece(sv.data(), sv.size());
82       return Status();
83     } else {
84       return Status(static_cast<tensorflow::errors::Code>(s.code()),
85                     s.message());
86     }
87   }
88 
89  private:
90   int fd_;
91 };
92 
93 class FdWritableFile : public WritableFile {
94  public:
FdWritableFile(int fd)95   explicit FdWritableFile(int fd) : fd_(fd) {}
96 
~FdWritableFile()97   ~FdWritableFile() override { Close(); }
98 
Append(StringPiece data)99   Status Append(StringPiece data) override {
100     return Write(data.data(), data.size());
101   }
102 
Close()103   Status Close() override {
104     if (has_closed_) return Status();
105     close(fd_);
106     has_closed_ = true;
107     return Status();
108   }
Flush()109   Status Flush() override { return Status(); }
Sync()110   Status Sync() override { return Status(); }
111 
112  private:
Write(const void * data,size_t data_size)113   Status Write(const void* data, size_t data_size) {
114     size_t write_len = data_size;
115     do {
116       size_t chunk_size = std::min<size_t>(write_len, kMaxWriteChunkSize);
117       ssize_t wrote = write(fd_, data, chunk_size);
118       if (wrote < 0) {
119         return Status(tensorflow::error::Code::UNKNOWN,
120                       absl::StrCat("Failed to write: ", errno));
121       }
122       data = static_cast<const uint8_t*>(data) + wrote;
123       write_len -= wrote;
124     } while (write_len > 0);
125     return Status();
126   }
127   int fd_;
128   bool has_closed_ = false;
129 };
130 
131 // Gets the file descriptor in the URI.
GetFd(absl::string_view fname,int * result)132 Status GetFd(absl::string_view fname, int* result) {
133   // Consume scheme and empty authority (fd:///)
134   if (!absl::ConsumePrefix(&fname, kFdFilesystemPrefix)) {
135     return errors::InvalidArgument("Bad uri: ", fname);
136   }
137 
138   // Try to parse remainder of path as an integer fd
139   if (!absl::SimpleAtoi(fname, result)) {
140     return errors::InvalidArgument("Bad path: ", fname);
141   }
142 
143   return OkStatus();
144 }
145 
146 }  // anonymous namespace
147 
NewRandomAccessFile(const string & filename,std::unique_ptr<RandomAccessFile> * result)148 Status FileDescriptorFileSystem::NewRandomAccessFile(
149     const string& filename, std::unique_ptr<RandomAccessFile>* result) {
150   int fd;
151   TF_RETURN_IF_ERROR(GetFd(filename, &fd));
152   FileStatistics stat;
153   TF_RETURN_IF_ERROR(Stat(filename, &stat));  // check against directory FD
154 
155   int dup_fd = dup(fd);
156   if (dup_fd == -1) {
157     return errors::Unknown("Failed to dup: errno ", errno);
158   }
159 
160   *result = std::make_unique<FdRandomAccessFile>(dup_fd);
161   return OkStatus();
162 }
163 
GetMatchingPaths(const string & pattern,std::vector<string> * results)164 Status FileDescriptorFileSystem::GetMatchingPaths(
165     const string& pattern, std::vector<string>* results) {
166   results->clear();
167   FileStatistics statistics;
168   if (Stat(pattern, &statistics).ok()) {
169     results->push_back(pattern);
170   }
171   return OkStatus();
172 }
173 
Stat(const string & fname,FileStatistics * stats)174 Status FileDescriptorFileSystem::Stat(const string& fname,
175                                       FileStatistics* stats) {
176   if (stats == nullptr) {
177     return errors::InvalidArgument("FileStatistics pointer must not be NULL");
178   }
179 
180   int fd;
181   TF_RETURN_IF_ERROR(GetFd(fname, &fd));
182 
183   struct stat st;
184   if (fstat(fd, &st) == -1) {
185     return errors::Unknown("Failed to fstat: errno ", errno);
186   }
187 
188   if (S_ISDIR(st.st_mode)) {
189     return errors::NotFound("File not found: is a directory");
190   }
191   stats->length = st.st_size;
192   stats->mtime_nsec = st.st_mtime * 1e9;
193   stats->is_directory = S_ISDIR(st.st_mode);
194 
195   return OkStatus();
196 }
197 
GetFileSize(const string & fname,uint64 * size)198 Status FileDescriptorFileSystem::GetFileSize(const string& fname,
199                                              uint64* size) {
200   FileStatistics stat;
201   TF_RETURN_IF_ERROR(Stat(fname, &stat));
202   *size = stat.length;
203   return OkStatus();
204 }
205 
NewReadOnlyMemoryRegionFromFile(const string & filename,std::unique_ptr<ReadOnlyMemoryRegion> * result)206 Status FileDescriptorFileSystem::NewReadOnlyMemoryRegionFromFile(
207     const string& filename, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
208   return errors::Unimplemented("Not implemented by the fd filesystem");
209 }
210 
FileExists(const string & fname)211 Status FileDescriptorFileSystem::FileExists(const string& fname) {
212   return errors::Unimplemented("Not implemented by the fd filesystem");
213 }
214 
GetChildren(const string & dir,std::vector<string> * r)215 Status FileDescriptorFileSystem::GetChildren(const string& dir,
216                                              std::vector<string>* r) {
217   return errors::Unimplemented("fd filesystem is non-hierarchical");
218 }
219 
NewWritableFile(const string & fname,std::unique_ptr<WritableFile> * result)220 Status FileDescriptorFileSystem::NewWritableFile(
221     const string& fname, std::unique_ptr<WritableFile>* result) {
222   int fd;
223   TF_RETURN_IF_ERROR(GetFd(fname, &fd));
224   FileStatistics stat;
225   TF_RETURN_IF_ERROR(Stat(fname, &stat));  // check against directory FD
226 
227   int dup_fd = dup(fd);
228   if (dup_fd == -1) {
229     return errors::Unknown("Failed to dup: errno ", errno);
230   }
231 
232   *result = std::make_unique<FdWritableFile>(dup_fd);
233   return OkStatus();
234 }
235 
NewAppendableFile(const string & fname,std::unique_ptr<WritableFile> * result)236 Status FileDescriptorFileSystem::NewAppendableFile(
237     const string& fname, std::unique_ptr<WritableFile>* result) {
238   return errors::Unimplemented("Not implemented by the fd filesystem");
239 }
240 
DeleteFile(const string & f)241 Status FileDescriptorFileSystem::DeleteFile(const string& f) {
242   return errors::Unimplemented("Not implemented by the fd filesystem");
243 }
244 
CreateDir(const string & d)245 Status FileDescriptorFileSystem::CreateDir(const string& d) {
246   return errors::Unimplemented("Not implemented by the fd filesystem");
247 }
248 
DeleteDir(const string & d)249 Status FileDescriptorFileSystem::DeleteDir(const string& d) {
250   return errors::Unimplemented("Not implemented by the fd filesystem");
251 }
252 
RenameFile(const string & s,const string & t)253 Status FileDescriptorFileSystem::RenameFile(const string& s, const string& t) {
254   return errors::Unimplemented("Not implemented by the fd filesystem");
255 }
256 
CanCreateTempFile(const std::string & fname,bool * can_create_temp_file)257 Status FileDescriptorFileSystem::CanCreateTempFile(const std::string& fname,
258                                                    bool* can_create_temp_file) {
259   *can_create_temp_file = false;
260   return OkStatus();
261 }
262 
263 REGISTER_FILE_SYSTEM("fd", FileDescriptorFileSystem);
264 
265 }  // namespace fcp
266 }  // namespace tensorflow
267