1 /*
2 * Copyright 2023 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "fcp/tensorflow/file_descriptor_filesystem.h"
18
19 #include <errno.h>
20 #include <sys/stat.h>
21 #include <unistd.h>
22
23 #include <algorithm>
24 #include <memory>
25 #include <vector>
26
27 #include "absl/memory/memory.h"
28 #include "absl/strings/numbers.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/string_view.h"
31 #include "absl/strings/strip.h"
32 #include "fcp/base/monitoring.h"
33 #include "tensorflow/core/platform/env.h"
34
35 namespace tensorflow {
36
37 namespace fcp {
38
39 using ::tensorflow::Status;
40
41 static constexpr char kFdFilesystemPrefix[] = "fd:///";
42 static constexpr size_t kMaxWriteChunkSize = 64u * 1024; // 64KB
43
44 namespace {
45 // Copied from base implementation in
46 // //external/tensorflow/tensorflow/tsl/platform/default/posix_file_system.cc
ReadBytesFromFd(int fd,uint64_t offset,size_t n,absl::string_view * result,char * scratch)47 absl::Status ReadBytesFromFd(int fd, uint64_t offset, size_t n,
48 absl::string_view* result, char* scratch) {
49 absl::Status s;
50 char* dst = scratch;
51 while (n > 0 && s.ok()) {
52 ssize_t r = pread(fd, dst, n, static_cast<off_t>(offset));
53 if (r > 0) {
54 dst += r;
55 n -= r;
56 offset += r;
57 } else if (r == 0) {
58 s = absl::OutOfRangeError(absl::StrCat(
59 "Read fewer bytes than requested. Total read bytes ", offset));
60 } else if (errno == EINTR || errno == EAGAIN) {
61 // Retry
62 } else {
63 s = absl::UnknownError(absl::StrCat("Failed to read: errno ", errno));
64 }
65 }
66 *result = absl::string_view(scratch, dst - scratch);
67 return s;
68 }
69
70 class FdRandomAccessFile : public RandomAccessFile {
71 public:
FdRandomAccessFile(int fd)72 explicit FdRandomAccessFile(int fd) : fd_(fd) {}
73
~FdRandomAccessFile()74 ~FdRandomAccessFile() override { close(fd_); }
75
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const76 Status Read(uint64 offset, size_t n, StringPiece* result,
77 char* scratch) const override {
78 absl::string_view sv;
79 absl::Status s = ReadBytesFromFd(fd_, offset, n, &sv, scratch);
80 if (s.ok()) {
81 *result = StringPiece(sv.data(), sv.size());
82 return Status();
83 } else {
84 return Status(static_cast<tensorflow::errors::Code>(s.code()),
85 s.message());
86 }
87 }
88
89 private:
90 int fd_;
91 };
92
93 class FdWritableFile : public WritableFile {
94 public:
FdWritableFile(int fd)95 explicit FdWritableFile(int fd) : fd_(fd) {}
96
~FdWritableFile()97 ~FdWritableFile() override { Close(); }
98
Append(StringPiece data)99 Status Append(StringPiece data) override {
100 return Write(data.data(), data.size());
101 }
102
Close()103 Status Close() override {
104 if (has_closed_) return Status();
105 close(fd_);
106 has_closed_ = true;
107 return Status();
108 }
Flush()109 Status Flush() override { return Status(); }
Sync()110 Status Sync() override { return Status(); }
111
112 private:
Write(const void * data,size_t data_size)113 Status Write(const void* data, size_t data_size) {
114 size_t write_len = data_size;
115 do {
116 size_t chunk_size = std::min<size_t>(write_len, kMaxWriteChunkSize);
117 ssize_t wrote = write(fd_, data, chunk_size);
118 if (wrote < 0) {
119 return Status(tensorflow::error::Code::UNKNOWN,
120 absl::StrCat("Failed to write: ", errno));
121 }
122 data = static_cast<const uint8_t*>(data) + wrote;
123 write_len -= wrote;
124 } while (write_len > 0);
125 return Status();
126 }
127 int fd_;
128 bool has_closed_ = false;
129 };
130
131 // Gets the file descriptor in the URI.
GetFd(absl::string_view fname,int * result)132 Status GetFd(absl::string_view fname, int* result) {
133 // Consume scheme and empty authority (fd:///)
134 if (!absl::ConsumePrefix(&fname, kFdFilesystemPrefix)) {
135 return errors::InvalidArgument("Bad uri: ", fname);
136 }
137
138 // Try to parse remainder of path as an integer fd
139 if (!absl::SimpleAtoi(fname, result)) {
140 return errors::InvalidArgument("Bad path: ", fname);
141 }
142
143 return OkStatus();
144 }
145
146 } // anonymous namespace
147
NewRandomAccessFile(const string & filename,std::unique_ptr<RandomAccessFile> * result)148 Status FileDescriptorFileSystem::NewRandomAccessFile(
149 const string& filename, std::unique_ptr<RandomAccessFile>* result) {
150 int fd;
151 TF_RETURN_IF_ERROR(GetFd(filename, &fd));
152 FileStatistics stat;
153 TF_RETURN_IF_ERROR(Stat(filename, &stat)); // check against directory FD
154
155 int dup_fd = dup(fd);
156 if (dup_fd == -1) {
157 return errors::Unknown("Failed to dup: errno ", errno);
158 }
159
160 *result = std::make_unique<FdRandomAccessFile>(dup_fd);
161 return OkStatus();
162 }
163
GetMatchingPaths(const string & pattern,std::vector<string> * results)164 Status FileDescriptorFileSystem::GetMatchingPaths(
165 const string& pattern, std::vector<string>* results) {
166 results->clear();
167 FileStatistics statistics;
168 if (Stat(pattern, &statistics).ok()) {
169 results->push_back(pattern);
170 }
171 return OkStatus();
172 }
173
Stat(const string & fname,FileStatistics * stats)174 Status FileDescriptorFileSystem::Stat(const string& fname,
175 FileStatistics* stats) {
176 if (stats == nullptr) {
177 return errors::InvalidArgument("FileStatistics pointer must not be NULL");
178 }
179
180 int fd;
181 TF_RETURN_IF_ERROR(GetFd(fname, &fd));
182
183 struct stat st;
184 if (fstat(fd, &st) == -1) {
185 return errors::Unknown("Failed to fstat: errno ", errno);
186 }
187
188 if (S_ISDIR(st.st_mode)) {
189 return errors::NotFound("File not found: is a directory");
190 }
191 stats->length = st.st_size;
192 stats->mtime_nsec = st.st_mtime * 1e9;
193 stats->is_directory = S_ISDIR(st.st_mode);
194
195 return OkStatus();
196 }
197
GetFileSize(const string & fname,uint64 * size)198 Status FileDescriptorFileSystem::GetFileSize(const string& fname,
199 uint64* size) {
200 FileStatistics stat;
201 TF_RETURN_IF_ERROR(Stat(fname, &stat));
202 *size = stat.length;
203 return OkStatus();
204 }
205
NewReadOnlyMemoryRegionFromFile(const string & filename,std::unique_ptr<ReadOnlyMemoryRegion> * result)206 Status FileDescriptorFileSystem::NewReadOnlyMemoryRegionFromFile(
207 const string& filename, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
208 return errors::Unimplemented("Not implemented by the fd filesystem");
209 }
210
FileExists(const string & fname)211 Status FileDescriptorFileSystem::FileExists(const string& fname) {
212 return errors::Unimplemented("Not implemented by the fd filesystem");
213 }
214
GetChildren(const string & dir,std::vector<string> * r)215 Status FileDescriptorFileSystem::GetChildren(const string& dir,
216 std::vector<string>* r) {
217 return errors::Unimplemented("fd filesystem is non-hierarchical");
218 }
219
NewWritableFile(const string & fname,std::unique_ptr<WritableFile> * result)220 Status FileDescriptorFileSystem::NewWritableFile(
221 const string& fname, std::unique_ptr<WritableFile>* result) {
222 int fd;
223 TF_RETURN_IF_ERROR(GetFd(fname, &fd));
224 FileStatistics stat;
225 TF_RETURN_IF_ERROR(Stat(fname, &stat)); // check against directory FD
226
227 int dup_fd = dup(fd);
228 if (dup_fd == -1) {
229 return errors::Unknown("Failed to dup: errno ", errno);
230 }
231
232 *result = std::make_unique<FdWritableFile>(dup_fd);
233 return OkStatus();
234 }
235
NewAppendableFile(const string & fname,std::unique_ptr<WritableFile> * result)236 Status FileDescriptorFileSystem::NewAppendableFile(
237 const string& fname, std::unique_ptr<WritableFile>* result) {
238 return errors::Unimplemented("Not implemented by the fd filesystem");
239 }
240
DeleteFile(const string & f)241 Status FileDescriptorFileSystem::DeleteFile(const string& f) {
242 return errors::Unimplemented("Not implemented by the fd filesystem");
243 }
244
CreateDir(const string & d)245 Status FileDescriptorFileSystem::CreateDir(const string& d) {
246 return errors::Unimplemented("Not implemented by the fd filesystem");
247 }
248
DeleteDir(const string & d)249 Status FileDescriptorFileSystem::DeleteDir(const string& d) {
250 return errors::Unimplemented("Not implemented by the fd filesystem");
251 }
252
RenameFile(const string & s,const string & t)253 Status FileDescriptorFileSystem::RenameFile(const string& s, const string& t) {
254 return errors::Unimplemented("Not implemented by the fd filesystem");
255 }
256
CanCreateTempFile(const std::string & fname,bool * can_create_temp_file)257 Status FileDescriptorFileSystem::CanCreateTempFile(const std::string& fname,
258 bool* can_create_temp_file) {
259 *can_create_temp_file = false;
260 return OkStatus();
261 }
262
263 REGISTER_FILE_SYSTEM("fd", FileDescriptorFileSystem);
264
265 } // namespace fcp
266 } // namespace tensorflow
267