1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/core/common_runtime/process_function_library_runtime.h"
16
17 #include <memory>
18 #include <unordered_map>
19 #include <vector>
20
21 #include "tensorflow/core/common_runtime/composite_device.h"
22 #include "tensorflow/core/common_runtime/device_factory.h"
23 #include "tensorflow/core/common_runtime/device_mgr.h"
24 #include "tensorflow/core/common_runtime/function_testlib.h"
25 #include "tensorflow/core/common_runtime/rendezvous_mgr.h"
26 #include "tensorflow/core/framework/function.h"
27 #include "tensorflow/core/framework/function_testlib.h"
28 #include "tensorflow/core/framework/metrics.h"
29 #include "tensorflow/core/framework/op_kernel.h"
30 #include "tensorflow/core/framework/resource_var.h"
31 #include "tensorflow/core/framework/tensor_testutil.h"
32 #include "tensorflow/core/framework/type_index.h"
33 #include "tensorflow/core/framework/types.pb.h"
34 #include "tensorflow/core/lib/core/errors.h"
35 #include "tensorflow/core/lib/core/status_test_util.h"
36 #include "tensorflow/core/lib/core/threadpool.h"
37 #include "tensorflow/core/lib/strings/str_util.h"
38 #include "tensorflow/core/platform/test.h"
39 #include "tensorflow/core/protobuf/config.pb.h"
40 #include "tensorflow/core/public/session_options.h"
41 #include "tensorflow/core/public/version.h"
42
43 #if GOOGLE_CUDA
44 #include "third_party/gpus/cuda/include/cuda.h"
45 #include "third_party/gpus/cuda/include/cuda_runtime_api.h"
46 #elif TENSORFLOW_USE_ROCM
47 #include "rocm/include/hip/hip_runtime.h"
48 #endif // GOOGLE_CUDA
49
50 namespace tensorflow {
51 namespace {
52
53 class TestClusterFLR : public DistributedFunctionLibraryRuntime {
54 public:
TestClusterFLR(DeviceMgr * device_mgr)55 explicit TestClusterFLR(DeviceMgr* device_mgr) : device_mgr_(device_mgr) {}
56
Instantiate(const string & function_name,const FunctionLibraryDefinition & lib_def,AttrSlice attrs,const FunctionLibraryRuntime::InstantiateOptions & options,FunctionLibraryRuntime::LocalHandle * handle,FunctionLibraryRuntime::DoneCallback done)57 void Instantiate(const string& function_name,
58 const FunctionLibraryDefinition& lib_def, AttrSlice attrs,
59 const FunctionLibraryRuntime::InstantiateOptions& options,
60 FunctionLibraryRuntime::LocalHandle* handle,
61 FunctionLibraryRuntime::DoneCallback done) override {
62 {
63 mutex_lock l(mu_);
64 *handle = next_handle_;
65 next_handle_++;
66 }
67 done(OkStatus());
68 }
69
Run(const FunctionLibraryRuntime::Options & opts,FunctionLibraryRuntime::LocalHandle handle,gtl::ArraySlice<Tensor> args,std::vector<Tensor> * rets,FunctionLibraryRuntime::DoneCallback done)70 void Run(const FunctionLibraryRuntime::Options& opts,
71 FunctionLibraryRuntime::LocalHandle handle,
72 gtl::ArraySlice<Tensor> args, std::vector<Tensor>* rets,
73 FunctionLibraryRuntime::DoneCallback done) override {}
74
Run(const FunctionLibraryRuntime::Options & opts,FunctionLibraryRuntime::LocalHandle handle,gtl::ArraySlice<FunctionArg> args,std::vector<FunctionRet> * rets,FunctionLibraryRuntime::DoneCallback done)75 void Run(const FunctionLibraryRuntime::Options& opts,
76 FunctionLibraryRuntime::LocalHandle handle,
77 gtl::ArraySlice<FunctionArg> args, std::vector<FunctionRet>* rets,
78 FunctionLibraryRuntime::DoneCallback done) override {}
79
CleanUp(uint64 step_id,FunctionLibraryRuntime::LocalHandle handle,FunctionLibraryRuntime::DoneCallback done)80 void CleanUp(uint64 step_id, FunctionLibraryRuntime::LocalHandle handle,
81 FunctionLibraryRuntime::DoneCallback done) override {}
82
remote_device_mgr() const83 DeviceMgr* remote_device_mgr() const override { return device_mgr_; }
84
85 private:
86 mutex mu_;
87 int next_handle_ TF_GUARDED_BY(mu_) = 0;
88 DeviceMgr* device_mgr_;
89 };
90
GenerateSessionMetadata()91 SessionMetadata GenerateSessionMetadata() {
92 SessionMetadata session_metadata;
93 session_metadata.set_name("name");
94 session_metadata.set_version(42);
95 return session_metadata;
96 }
97
98 // TODO(b/128707168): Tests requiring a GPU device are currently always skipped
99 // because the check for whether a GPU device is present happens before the GPU
100 // device is set up.
101 class ProcessFunctionLibraryRuntimeTest : public ::testing::Test {
102 public:
ProcessFunctionLibraryRuntimeTest()103 ProcessFunctionLibraryRuntimeTest() {
104 SessionOptions options;
105 auto* device_count = options.config.mutable_device_count();
106 device_count->insert({"CPU", 3});
107 std::vector<std::unique_ptr<Device>> created_devices;
108 TF_CHECK_OK(DeviceFactory::AddDevices(options, "/job:a/replica:0/task:0",
109 &created_devices));
110 // Do not add CPU:2 to device manager. Used for removed device testing.
111 device2_ = std::move(created_devices[2]);
112 created_devices.erase(created_devices.begin() + 2);
113
114 device_mgr_ = std::make_unique<DynamicDeviceMgr>();
115 TF_CHECK_OK(device_mgr_->AddDevices(std::move(created_devices)));
116 TF_CHECK_OK(device_mgr_->LookupDevice(
117 "/job:a/replica:0/task:0/device:CPU:0", &device0_));
118 TF_CHECK_OK(device_mgr_->LookupDevice(
119 "/job:a/replica:0/task:0/device:CPU:1", &device1_));
120 Device* device2_ptr = nullptr;
121 EXPECT_NE(
122 error::OK,
123 device_mgr_
124 ->LookupDevice("/job:a/replica:0/task:0/device:CPU:2", &device2_ptr)
125 .code());
126 // If no GPU is available, gpu_device_ will remain nullptr.
127 Status status = device_mgr_->LookupDevice(
128 "/job:a/replica:0/task:0/device:GPU:0", &gpu_device_);
129 if (!status.ok()) {
130 CHECK_EQ(nullptr, gpu_device_);
131 }
132 }
133
Init(const std::vector<FunctionDef> & flib,const SessionMetadata * session_metadata=nullptr)134 void Init(const std::vector<FunctionDef>& flib,
135 const SessionMetadata* session_metadata = nullptr) {
136 FunctionDefLibrary proto;
137 for (const auto& fdef : flib) *(proto.add_function()) = fdef;
138 lib_def_.reset(new FunctionLibraryDefinition(OpRegistry::Global(), proto));
139 OptimizerOptions opts;
140 cluster_flr_.reset(new TestClusterFLR(device_mgr_.get()));
141 proc_flr_.reset(new ProcessFunctionLibraryRuntime(
142 device_mgr_.get(), Env::Default(), /*config=*/nullptr,
143 TF_GRAPH_DEF_VERSION, lib_def_.get(), opts,
144 /*thread_pool=*/nullptr, cluster_flr_.get(), session_metadata,
145 Rendezvous::Factory{
146 [this](const int64_t step_id, const DeviceMgr* device_mgr,
147 Rendezvous** r) {
148 *r = new IntraProcessRendezvous(device_mgr);
149 if (rendezvous_ref_counts_.find(step_id) !=
150 rendezvous_ref_counts_.end()) {
151 rendezvous_ref_counts_[step_id]++;
152 } else {
153 rendezvous_ref_counts_[step_id] = 1;
154 }
155 return OkStatus();
156 },
157 [this](const int64_t step_id) {
158 CHECK(rendezvous_ref_counts_.find(step_id) !=
159 rendezvous_ref_counts_.end());
160 rendezvous_ref_counts_[step_id]--;
161 return OkStatus();
162 }}));
163 }
164
AddCompositeDevice(CompositeDevice * d)165 void AddCompositeDevice(CompositeDevice* d) {
166 proc_flr_->AddCompositeDevice(d);
167 }
168
Instantiate(const string & name,test::function::Attrs attrs,const FunctionLibraryRuntime::InstantiateOptions & instantiate_opts,FunctionLibraryRuntime::Handle * handle)169 Status Instantiate(
170 const string& name, test::function::Attrs attrs,
171 const FunctionLibraryRuntime::InstantiateOptions& instantiate_opts,
172 FunctionLibraryRuntime::Handle* handle) {
173 return proc_flr_->Instantiate(name, attrs, instantiate_opts, handle);
174 }
175
GPUToCPU(const Tensor & device_tensor)176 Tensor GPUToCPU(const Tensor& device_tensor) {
177 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
178 CHECK(gpu_device_);
179 CHECK(gpu_device_->tensorflow_accelerator_device_info() != nullptr);
180 DeviceContext* device_context =
181 gpu_device_->tensorflow_accelerator_device_info()->default_context;
182
183 Tensor cpu_tensor(device_tensor.dtype(), device_tensor.shape());
184 CHECK(device_context
185 ->CopyDeviceTensorToCPUSync(&device_tensor, "", gpu_device_,
186 &cpu_tensor)
187 .ok());
188 return cpu_tensor;
189 #else
190 CHECK(false);
191 #endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
192 }
193
CPUToGPU(const Tensor & cpu_tensor)194 Tensor CPUToGPU(const Tensor& cpu_tensor) {
195 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
196 CHECK(gpu_device_);
197 CHECK(gpu_device_->tensorflow_accelerator_device_info() != nullptr);
198 DeviceContext* device_context =
199 gpu_device_->tensorflow_accelerator_device_info()->default_context;
200
201 Tensor device_tensor(gpu_device_->GetAllocator({}), cpu_tensor.dtype(),
202 cpu_tensor.shape(), {});
203 CHECK(device_context
204 ->CopyCPUTensorToDeviceSync(&cpu_tensor, gpu_device_,
205 &device_tensor)
206 .ok());
207 return device_tensor;
208 #else
209 CHECK(false);
210 #endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
211 }
212
213 template <typename T, typename K>
RunWithRuntime(const string & name,FunctionLibraryRuntime::Options opts,test::function::Attrs attrs,const FunctionLibraryRuntime::InstantiateOptions & instantiate_opts,const T & args,std::vector<K * > rets,ProcessFunctionLibraryRuntime * pflr)214 Status RunWithRuntime(
215 const string& name, FunctionLibraryRuntime::Options opts,
216 test::function::Attrs attrs,
217 const FunctionLibraryRuntime::InstantiateOptions& instantiate_opts,
218 const T& args, std::vector<K*> rets,
219 ProcessFunctionLibraryRuntime* pflr) {
220 FunctionLibraryRuntime::Handle handle;
221 Status status = pflr->Instantiate(name, attrs, instantiate_opts, &handle);
222 if (!status.ok()) {
223 return status;
224 }
225 bool is_cross_process = false;
226 TF_CHECK_OK(pflr->IsCrossProcess(handle, &is_cross_process));
227 EXPECT_FALSE(is_cross_process);
228
229 std::function<void(std::function<void()>)> runner =
230 [](std::function<void()> fn) {
231 test::function::FunctionTestSchedClosure(fn);
232 };
233 Notification done;
234 opts.runner = &runner;
235 std::vector<K> out;
236 pflr->Run(opts, handle, args, &out, [&status, &done](const Status& s) {
237 status = s;
238 done.Notify();
239 });
240 done.WaitForNotification();
241 if (!status.ok()) {
242 return status;
243 }
244 CHECK_EQ(rets.size(), out.size());
245 for (size_t i = 0; i < rets.size(); ++i) {
246 *rets[i] = out[i];
247 }
248
249 // Release the handle and then try running the function. It shouldn't
250 // succeed.
251 status = pflr->ReleaseHandle(handle);
252 if (!status.ok()) {
253 return status;
254 }
255 Notification done2;
256 pflr->Run(opts, handle, args, &out, [&status, &done2](const Status& s) {
257 status = s;
258 done2.Notify();
259 });
260 done2.WaitForNotification();
261 EXPECT_TRUE(errors::IsNotFound(status)) << "Actual status: " << status;
262 EXPECT_TRUE(absl::StrContains(status.error_message(), "not found."));
263
264 return OkStatus();
265 }
266
Run(const string & name,FunctionLibraryRuntime::Options opts,test::function::Attrs attrs,const FunctionLibraryRuntime::InstantiateOptions & instantiate_opts,const std::vector<Tensor> & args,std::vector<Tensor * > rets,ProcessFunctionLibraryRuntime * pflr=nullptr)267 Status Run(const string& name, FunctionLibraryRuntime::Options opts,
268 test::function::Attrs attrs,
269 const FunctionLibraryRuntime::InstantiateOptions& instantiate_opts,
270 const std::vector<Tensor>& args, std::vector<Tensor*> rets,
271 ProcessFunctionLibraryRuntime* pflr = nullptr) {
272 return RunWithRuntime<std::vector<Tensor>, Tensor>(
273 name, opts, attrs, instantiate_opts, args, rets, proc_flr_.get());
274 }
275
RunWithPackedArgs(const string & name,FunctionLibraryRuntime::Options opts,test::function::Attrs attrs,const FunctionLibraryRuntime::InstantiateOptions & instantiate_opts,const FunctionArgsInterface & args,std::vector<FunctionRet * > rets,ProcessFunctionLibraryRuntime * pflr=nullptr)276 Status RunWithPackedArgs(
277 const string& name, FunctionLibraryRuntime::Options opts,
278 test::function::Attrs attrs,
279 const FunctionLibraryRuntime::InstantiateOptions& instantiate_opts,
280 const FunctionArgsInterface& args, std::vector<FunctionRet*> rets,
281 ProcessFunctionLibraryRuntime* pflr = nullptr) {
282 return RunWithRuntime<FunctionArgsInterface, FunctionRet>(
283 name, opts, attrs, instantiate_opts, args, rets, proc_flr_.get());
284 }
285
RunInstantiated(FunctionLibraryRuntime::Handle handle,FunctionLibraryRuntime::Options opts,const std::vector<Tensor> & args,std::vector<Tensor * > rets)286 Status RunInstantiated(FunctionLibraryRuntime::Handle handle,
287 FunctionLibraryRuntime::Options opts,
288 const std::vector<Tensor>& args,
289 std::vector<Tensor*> rets) {
290 std::function<void(std::function<void()>)> runner =
291 [](std::function<void()> fn) {
292 test::function::FunctionTestSchedClosure(fn);
293 };
294
295 opts.runner = &runner;
296 Status status;
297 Notification done;
298 std::vector<Tensor> out;
299 proc_flr_->Run(opts, handle, args, &out, [&status, &done](const Status& s) {
300 status = s;
301 done.Notify();
302 });
303 done.WaitForNotification();
304 if (!status.ok()) {
305 return status;
306 }
307 CHECK_EQ(rets.size(), out.size());
308 for (size_t i = 0; i < rets.size(); ++i) {
309 *rets[i] = out[i];
310 }
311 return OkStatus();
312 }
313
314 std::unique_ptr<DynamicDeviceMgr> device_mgr_;
315 Device* device0_ = nullptr; // Not owned. (Owned by device_mgr_.)
316 Device* device1_ = nullptr; // Not owned. (Owned by device_mgr_.)
317 std::unique_ptr<Device> device2_;
318 // Remains as nullptr if no GPU is available.
319 Device* gpu_device_ = nullptr; // Not owned. (Owned by device_mgr_.)
320 std::unique_ptr<FunctionLibraryDefinition> lib_def_;
321 std::unique_ptr<TestClusterFLR> cluster_flr_;
322 std::unique_ptr<ProcessFunctionLibraryRuntime> proc_flr_;
323
324 // To ensure that we are cleaning up the rendezvous properly.
325 std::unordered_map<int64_t, int> rendezvous_ref_counts_;
326 };
327
TEST_F(ProcessFunctionLibraryRuntimeTest,GetFLRNull)328 TEST_F(ProcessFunctionLibraryRuntimeTest, GetFLRNull) {
329 FunctionDefLibrary proto;
330 std::unique_ptr<FunctionLibraryDefinition> lib_def(
331 new FunctionLibraryDefinition(OpRegistry::Global(), proto));
332 OptimizerOptions opts;
333 std::unique_ptr<ProcessFunctionLibraryRuntime> proc_flr(
334 new ProcessFunctionLibraryRuntime(
335 nullptr /* device_mgr */, Env::Default(), /*config=*/nullptr,
336 TF_GRAPH_DEF_VERSION, lib_def.get(), opts));
337 FunctionLibraryRuntime* flr =
338 proc_flr->GetFLR(ProcessFunctionLibraryRuntime::kDefaultFLRDevice);
339 EXPECT_NE(flr, nullptr);
340 }
341
TEST_F(ProcessFunctionLibraryRuntimeTest,DeviceSet)342 TEST_F(ProcessFunctionLibraryRuntimeTest, DeviceSet) {
343 FunctionDefLibrary proto;
344 std::unique_ptr<FunctionLibraryDefinition> lib_def(
345 new FunctionLibraryDefinition(OpRegistry::Global(), proto));
346 OptimizerOptions opts;
347 std::vector<std::unique_ptr<Device>> devices;
348 devices.emplace_back(std::move(device2_));
349 auto mgr = std::make_unique<DynamicDeviceMgr>();
350 TF_CHECK_OK(mgr.get()->AddDevices(std::move(devices)));
351
352 std::unique_ptr<ProcessFunctionLibraryRuntime> proc_flr(
353 new ProcessFunctionLibraryRuntime(
354 /*device_mgr=*/device_mgr_.get(), Env::Default(),
355 /*config=*/nullptr, TF_GRAPH_DEF_VERSION, lib_def.get(), opts,
356 /*thread_pool=*/nullptr));
357 EXPECT_NE(nullptr, proc_flr->device_set()->FindDeviceByName(
358 "/job:a/replica:0/task:0/device:CPU:0"));
359 EXPECT_NE(nullptr, proc_flr->device_set()->FindDeviceByName(
360 "/job:a/replica:0/task:0/device:CPU:1"));
361
362 cluster_flr_.reset(new TestClusterFLR(mgr.get()));
363 proc_flr.reset(new ProcessFunctionLibraryRuntime(
364 /*device_mgr=*/device_mgr_.get(), Env::Default(),
365 /*config=*/nullptr, TF_GRAPH_DEF_VERSION, lib_def.get(), opts,
366 /*thread_pool=*/nullptr, /*parent_=*/cluster_flr_.get()));
367 EXPECT_NE(nullptr, proc_flr->device_set()->FindDeviceByName(
368 "/job:a/replica:0/task:0/device:CPU:2"));
369 }
370
TEST_F(ProcessFunctionLibraryRuntimeTest,Basic)371 TEST_F(ProcessFunctionLibraryRuntimeTest, Basic) {
372 Init({});
373 FunctionLibraryRuntime* flr =
374 proc_flr_->GetFLR("/job:a/replica:0/task:0/cpu:0");
375 EXPECT_NE(flr, nullptr);
376 EXPECT_EQ(flr->device(), device0_);
377 flr = proc_flr_->GetFLR("/job:a/replica:0/task:0/device:CPU:0");
378 EXPECT_NE(flr, nullptr);
379 EXPECT_EQ(flr->device(), device0_);
380 flr = proc_flr_->GetFLR("/device:CPU:0");
381 EXPECT_NE(flr, nullptr);
382 EXPECT_EQ(flr->device(), device0_);
383 flr = proc_flr_->GetFLR("/job:a/replica:0/task:0/cpu:1");
384 EXPECT_NE(flr, nullptr);
385 EXPECT_EQ(flr->device(), device1_);
386 flr = proc_flr_->GetFLR("abc");
387 EXPECT_EQ(flr, nullptr);
388 }
389
TEST_F(ProcessFunctionLibraryRuntimeTest,GetDeviceIncarnation)390 TEST_F(ProcessFunctionLibraryRuntimeTest, GetDeviceIncarnation) {
391 Init({});
392 int64_t incarnation;
393 TF_EXPECT_OK(proc_flr_->GetDeviceIncarnation("/job:a/replica:0/task:0/cpu:1",
394 &incarnation));
395 // Incarnation is a random number other than 0.
396 EXPECT_NE(incarnation, 0);
397 Status s = proc_flr_->GetDeviceIncarnation("/job:a/replica:0/task:0/cpu:2",
398 &incarnation);
399 EXPECT_EQ(s.code(), error::INVALID_ARGUMENT);
400 }
401
TEST_F(ProcessFunctionLibraryRuntimeTest,SingleCall)402 TEST_F(ProcessFunctionLibraryRuntimeTest, SingleCall) {
403 Init({test::function::XTimesTwo()});
404 FunctionLibraryRuntime::Options opts;
405 opts.source_device = "/job:a/replica:0/task:0/cpu:0";
406 opts.remote_execution = true;
407 FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
408 instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
409 auto x = test::AsTensor<float>({1, 2, 3, 4});
410 Tensor y;
411 TF_CHECK_OK(
412 Run("XTimesTwo", opts, {{"T", DT_FLOAT}}, instantiate_opts, {x}, {&y}));
413 test::ExpectTensorEqual<float>(y, test::AsTensor<float>({2, 4, 6, 8}));
414 }
415
TEST_F(ProcessFunctionLibraryRuntimeTest,SingleCallFindDevice)416 TEST_F(ProcessFunctionLibraryRuntimeTest, SingleCallFindDevice) {
417 Init({test::function::FindDevice()});
418 FunctionLibraryRuntime::Options opts;
419 opts.source_device = "/job:a/replica:0/task:0/cpu:0";
420 opts.remote_execution = true;
421 FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
422 instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
423 Tensor y;
424 TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts, {}, {&y}));
425 test::ExpectTensorEqual<tstring>(
426 y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:0"},
427 TensorShape({})));
428 EXPECT_EQ(1, rendezvous_ref_counts_.size());
429 EXPECT_EQ(opts.step_id, rendezvous_ref_counts_.begin()->first);
430 EXPECT_EQ(0, rendezvous_ref_counts_.begin()->second);
431 }
432
TEST_F(ProcessFunctionLibraryRuntimeTest,MultipleCallsSameDeviceXTimes)433 TEST_F(ProcessFunctionLibraryRuntimeTest, MultipleCallsSameDeviceXTimes) {
434 Init({test::function::XTimesTwo(), test::function::XTimesFour()});
435 auto x = test::AsTensor<float>({1, 2, 3, 4});
436 FunctionLibraryRuntime::Options opts;
437 opts.source_device = "/job:a/replica:0/task:0/cpu:0";
438 opts.remote_execution = true;
439 FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
440 instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
441 Tensor y;
442 TF_CHECK_OK(
443 Run("XTimesTwo", opts, {{"T", DT_FLOAT}}, instantiate_opts, {x}, {&y}));
444 test::ExpectTensorEqual<float>(y, test::AsTensor<float>({2, 4, 6, 8}));
445 TF_CHECK_OK(
446 Run("XTimesFour", opts, {{"T", DT_FLOAT}}, instantiate_opts, {x}, {&y}));
447 test::ExpectTensorEqual<float>(y, test::AsTensor<float>({4, 8, 12, 16}));
448 }
449
TEST_F(ProcessFunctionLibraryRuntimeTest,MultipleCallsSameDeviceFindDevice)450 TEST_F(ProcessFunctionLibraryRuntimeTest, MultipleCallsSameDeviceFindDevice) {
451 Init({test::function::FindDevice()});
452 FunctionLibraryRuntime::Options opts;
453 opts.source_device = "/job:a/replica:0/task:0/cpu:0";
454 opts.remote_execution = true;
455 FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
456 instantiate_opts.target = "/job:a/replica:0/task:0/cpu:1";
457 Tensor y;
458 TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts, {}, {&y}));
459 test::ExpectTensorEqual<tstring>(
460 y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:1"},
461 TensorShape({})));
462 TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts, {}, {&y}));
463 test::ExpectTensorEqual<tstring>(
464 y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:1"},
465 TensorShape({})));
466 }
467
TEST_F(ProcessFunctionLibraryRuntimeTest,MultipleCallsDiffDeviceFindDevice)468 TEST_F(ProcessFunctionLibraryRuntimeTest, MultipleCallsDiffDeviceFindDevice) {
469 Init({test::function::FindDevice()});
470 FunctionLibraryRuntime::Options opts;
471 opts.source_device = "/job:a/replica:0/task:0/cpu:0";
472 opts.remote_execution = true;
473 Tensor y;
474 FunctionLibraryRuntime::InstantiateOptions instantiate_opts_0;
475 instantiate_opts_0.target = "/job:a/replica:0/task:0/device:CPU:0";
476 TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts_0, {}, {&y}));
477 test::ExpectTensorEqual<tstring>(
478 y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:0"},
479 TensorShape({})));
480 FunctionLibraryRuntime::InstantiateOptions instantiate_opts_1;
481 instantiate_opts_1.target = "/job:a/replica:0/task:0/device:CPU:1";
482 TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts_1, {}, {&y}));
483 test::ExpectTensorEqual<tstring>(
484 y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:1"},
485 TensorShape({})));
486 }
487
TEST_F(ProcessFunctionLibraryRuntimeTest,InstantiateFunctionOnRemovedDevice)488 TEST_F(ProcessFunctionLibraryRuntimeTest, InstantiateFunctionOnRemovedDevice) {
489 std::vector<std::unique_ptr<Device>> devices;
490 Device* device2_ptr = device2_.get();
491 devices.emplace_back(std::move(device2_));
492 TF_CHECK_OK(device_mgr_->AddDevices(std::move(devices)));
493
494 Init({test::function::FindDevice()});
495 std::vector<Device*> remove_devices{device2_ptr};
496 TF_CHECK_OK(device_mgr_->RemoveDevices(std::move(remove_devices)));
497
498 // Since the process FLR device set is not updated yet, it still holds the
499 // raw pointer to device2. Make sure that function instantion with device2
500 // will not lead to segfault.
501 FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
502 FunctionLibraryRuntime::Handle h;
503 instantiate_opts.target = "/job:a/replica:0/task:0/device:CPU:1";
504 instantiate_opts.is_multi_device_function = true;
505 TF_CHECK_OK(Instantiate("FindDevice",
506 {{"_target", "/job:b/replica:0/task:0/device:CPU:2"}},
507 instantiate_opts, &h));
508 }
509
TEST_F(ProcessFunctionLibraryRuntimeTest,ClusterFLRSerialTest)510 TEST_F(ProcessFunctionLibraryRuntimeTest, ClusterFLRSerialTest) {
511 Init({test::function::FindDevice()});
512 FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
513 instantiate_opts.target = "/job:b/replica:0/task:0/device:CPU:0";
514 FunctionLibraryRuntime::Handle h;
515 TF_CHECK_OK(Instantiate("FindDevice",
516 {{"_target", "/job:b/replica:0/task:0/device:CPU:0"}},
517 instantiate_opts, &h));
518 bool is_cross_process = false;
519 TF_CHECK_OK(proc_flr_->IsCrossProcess(h, &is_cross_process));
520 EXPECT_TRUE(is_cross_process);
521 EXPECT_EQ(0, proc_flr_->GetHandleOnDevice(
522 "/job:b/replica:0/task:0/device:CPU:0", h));
523 TF_CHECK_OK(Instantiate("FindDevice",
524 {{"_target", "/job:b/replica:0/task:0/device:CPU:0"}},
525 instantiate_opts, &h));
526 EXPECT_EQ(0, proc_flr_->GetHandleOnDevice(
527 "/job:b/replica:0/task:0/device:CPU:0", h));
528 instantiate_opts.target = "/job:c/replica:0/task:0/device:CPU:0";
529 TF_CHECK_OK(Instantiate("FindDevice",
530 {{"_target", "/job:c/replica:0/task:0/device:CPU:0"}},
531 instantiate_opts, &h));
532 EXPECT_EQ(1, proc_flr_->GetHandleOnDevice(
533 "/job:c/replica:0/task:0/device:CPU:0", h));
534 }
535
TEST_F(ProcessFunctionLibraryRuntimeTest,ClusterFLRParallelTest)536 TEST_F(ProcessFunctionLibraryRuntimeTest, ClusterFLRParallelTest) {
537 Init({test::function::FindDevice()});
538 FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
539 instantiate_opts.target = "/job:b/replica:0/task:0/device:CPU:0";
540
541 thread::ThreadPool* tp = new thread::ThreadPool(Env::Default(), "test", 4);
542 auto fn = [this, &instantiate_opts]() {
543 FunctionLibraryRuntime::Handle h;
544 TF_CHECK_OK(Instantiate(
545 "FindDevice", {{"_target", "/job:b/replica:0/task:0/device:CPU:0"}},
546 instantiate_opts, &h));
547 EXPECT_EQ(0, proc_flr_->GetHandleOnDevice(
548 "/job:b/replica:0/task:0/device:CPU:0", h));
549 };
550
551 for (int i = 0; i < 100; ++i) {
552 tp->Schedule(fn);
553 }
554 delete tp;
555 }
556
IsCUDATensor(const Tensor & t)557 bool IsCUDATensor(const Tensor& t) {
558 #if GOOGLE_CUDA
559 cudaPointerAttributes attributes;
560 cudaError_t err =
561 cudaPointerGetAttributes(&attributes, t.tensor_data().data());
562 if (err == cudaErrorInvalidValue) return false;
563 CHECK_EQ(cudaSuccess, err) << cudaGetErrorString(err);
564 return (attributes.type == cudaMemoryTypeDevice);
565 #elif TENSORFLOW_USE_ROCM
566 hipPointerAttribute_t attributes;
567 hipError_t err = hipPointerGetAttributes(&attributes, t.tensor_data().data());
568 if (err == hipErrorInvalidValue) return false;
569 CHECK_EQ(hipSuccess, err) << hipGetErrorString(err);
570 return (attributes.memoryType == hipMemoryTypeDevice);
571 #else
572 CHECK(false)
573 << "IsCUDATensor should not be called when CUDA is not available";
574 #endif // GOOGLE_CUDA
575 }
576
TestTwoDeviceMult(ProcessFunctionLibraryRuntimeTest * fixture,const FunctionLibraryRuntime::InstantiateOptions & inst_opts,const string & error="")577 void TestTwoDeviceMult(
578 ProcessFunctionLibraryRuntimeTest* fixture,
579 const FunctionLibraryRuntime::InstantiateOptions& inst_opts,
580 const string& error = "") {
581 fixture->Init({test::function::TwoDeviceMult()});
582 FunctionLibraryRuntime::Options opts;
583 auto x = test::AsTensor<float>({1, 2, 3});
584 Tensor y_cpu;
585 Tensor y_gpu;
586 Status status = fixture->Run("TwoDeviceMult", opts, {{"T", DT_FLOAT}},
587 inst_opts, {x}, {&y_cpu, &y_gpu});
588 if (!error.empty()) {
589 EXPECT_TRUE(errors::IsInvalidArgument(status))
590 << "Actual status: " << status;
591 EXPECT_TRUE(absl::StrContains(status.error_message(), error))
592 << "Actual error message: " << status.error_message();
593 return;
594 }
595
596 EXPECT_TRUE(status.ok()) << "Actual status: " << status;
597 EXPECT_FALSE(IsCUDATensor(y_cpu));
598 test::ExpectTensorEqual<float>(y_cpu, test::AsTensor<float>({2, 4, 6}));
599
600 EXPECT_TRUE(IsCUDATensor(y_gpu));
601 Tensor y_gpu_on_cpu = fixture->GPUToCPU(y_gpu);
602 test::ExpectTensorEqual<float>(y_gpu_on_cpu,
603 test::AsTensor<float>({3, 6, 9}));
604 }
605
TestInstantiateSimpleFunction(ProcessFunctionLibraryRuntimeTest * fixture,const FunctionLibraryRuntime::InstantiateOptions & orig_opts)606 void TestInstantiateSimpleFunction(
607 ProcessFunctionLibraryRuntimeTest* fixture,
608 const FunctionLibraryRuntime::InstantiateOptions& orig_opts) {
609 fixture->Init({test::function::FindDevice()});
610 FunctionLibraryRuntime::InstantiateOptions opts_copy = orig_opts;
611 opts_copy.input_devices.clear();
612 FunctionLibraryRuntime::Handle h;
613 TF_CHECK_OK(fixture->Instantiate(
614 "FindDevice", {{"_target", "/job:b/replica:0/task:0/device:CPU:0"}},
615 opts_copy, &h));
616 }
617
TestControlFlow(ProcessFunctionLibraryRuntimeTest * fixture,const FunctionLibraryRuntime::InstantiateOptions & inst_opts)618 void TestControlFlow(
619 ProcessFunctionLibraryRuntimeTest* fixture,
620 const FunctionLibraryRuntime::InstantiateOptions& inst_opts) {
621 fixture->Init({test::function::ControlFlow()});
622
623 FunctionLibraryRuntime::Options opts;
624 Tensor x1 = test::AsTensor<float>({3, 5, 17, 257});
625 if (absl::StrContains(inst_opts.input_devices[0], "GPU")) {
626 x1 = fixture->CPUToGPU(x1);
627 }
628 Tensor y1;
629 TF_CHECK_OK(fixture->Run("ControlFlow", opts, {}, inst_opts, {x1}, {&y1}));
630
631 if (absl::StrContains(inst_opts.output_devices[0], "GPU")) {
632 EXPECT_TRUE(IsCUDATensor(y1));
633 y1 = fixture->GPUToCPU(y1);
634 }
635 test::ExpectTensorEqual<float>(y1, test::AsTensor<float>({3, 5, 17, 257}));
636 }
637
TestTwoDeviceInputOutput(ProcessFunctionLibraryRuntimeTest * fixture,const FunctionLibraryRuntime::InstantiateOptions & inst_opts)638 void TestTwoDeviceInputOutput(
639 ProcessFunctionLibraryRuntimeTest* fixture,
640 const FunctionLibraryRuntime::InstantiateOptions& inst_opts) {
641 if (fixture->gpu_device_ == nullptr) {
642 GTEST_SKIP() << "No GPUs available";
643 }
644 fixture->Init({test::function::TwoDeviceInputOutput()});
645
646 FunctionLibraryRuntime::Options opts;
647 Tensor x1 = test::AsTensor<float>({1, 2});
648 if (absl::StrContains(inst_opts.input_devices[0], "GPU")) {
649 x1 = fixture->CPUToGPU(x1);
650 }
651 Tensor x2 = test::AsTensor<float>({10, 20});
652 if (absl::StrContains(inst_opts.input_devices[1], "GPU")) {
653 x2 = fixture->CPUToGPU(x2);
654 }
655 Tensor y1;
656 Tensor y2;
657 TF_CHECK_OK(fixture->Run("TwoDeviceInputOutput", opts, {{"T", DT_FLOAT}},
658 inst_opts, {x1, x2}, {&y1, &y2}));
659
660 if (absl::StrContains(inst_opts.output_devices[0], "GPU")) {
661 EXPECT_TRUE(IsCUDATensor(y1));
662 y1 = fixture->GPUToCPU(y1);
663 } else {
664 EXPECT_FALSE(IsCUDATensor(y1));
665 }
666 test::ExpectTensorEqual<float>(y1, test::AsTensor<float>({2, 4}));
667
668 if (absl::StrContains(inst_opts.output_devices[1], "GPU")) {
669 EXPECT_TRUE(IsCUDATensor(y2));
670 y2 = fixture->GPUToCPU(y2);
671 } else {
672 EXPECT_FALSE(IsCUDATensor(y2));
673 }
674 test::ExpectTensorEqual<float>(y2, test::AsTensor<float>({30, 60}));
675 }
676
CompleteDevices(const std::vector<string> & v)677 std::vector<string> CompleteDevices(const std::vector<string>& v) {
678 std::vector<string> result;
679 result.reserve(v.size());
680 for (const string& s : v) {
681 result.push_back(strings::StrCat("/job:a/replica:0/task:0/device:", s));
682 }
683 return result;
684 }
685
MakeOptions(const string & target,const std::vector<string> & input_devices,const std::vector<string> & output_devices)686 FunctionLibraryRuntime::InstantiateOptions MakeOptions(
687 const string& target, const std::vector<string>& input_devices,
688 const std::vector<string>& output_devices) {
689 FunctionLibraryRuntime::InstantiateOptions inst_opts;
690 inst_opts.target = target;
691 inst_opts.input_devices = CompleteDevices(input_devices);
692 inst_opts.output_devices = CompleteDevices(output_devices);
693 inst_opts.is_multi_device_function = true;
694 return inst_opts;
695 }
696
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ExplicitOutputDevice)697 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ExplicitOutputDevice) {
698 if (gpu_device_ == nullptr) {
699 GTEST_SKIP() << "No GPUs available";
700 }
701 TestTwoDeviceMult(this, MakeOptions("CPU:0", {"CPU:0"}, {"CPU:0", "GPU:0"}));
702 }
703
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_InferredOutputDevice)704 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_InferredOutputDevice) {
705 if (gpu_device_ == nullptr) {
706 GTEST_SKIP() << "No GPUs available";
707 }
708 TestTwoDeviceMult(this, MakeOptions("CPU:0", {"CPU:0"}, {}));
709 }
710
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ErrorWhenNoInputDevices)711 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ErrorWhenNoInputDevices) {
712 if (gpu_device_ == nullptr) {
713 GTEST_SKIP() << "No GPUs available";
714 }
715 TestTwoDeviceMult(this, MakeOptions("CPU:0", {}, {}),
716 "input_devices must have the same length");
717 }
718
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ErrorWhenTooManyInputDevices)719 TEST_F(ProcessFunctionLibraryRuntimeTest,
720 MultiDevice_ErrorWhenTooManyInputDevices) {
721 if (gpu_device_ == nullptr) {
722 GTEST_SKIP() << "No GPUs available";
723 }
724 TestTwoDeviceMult(this, MakeOptions("CPU:0", {"CPU:0", "CPU:1"}, {}),
725 "input_devices must have the same length");
726 }
727
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ErrorWhenTooManyOutputDevices)728 TEST_F(ProcessFunctionLibraryRuntimeTest,
729 MultiDevice_ErrorWhenTooManyOutputDevices) {
730 TestTwoDeviceMult(
731 this, MakeOptions("CPU:0", {"CPU:0"}, {"CPU:0", "GPU:0", "CPU:1"}),
732 "output_devices must either be empty or have the same length");
733 }
734
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ErrorWhenBadTargetDevice)735 TEST_F(ProcessFunctionLibraryRuntimeTest,
736 MultiDevice_ErrorWhenBadTargetDevice) {
737 TestTwoDeviceMult(
738 this, MakeOptions("GPU:11", {"CPU:0"}, {"CPU:0", "GPU:0"}),
739 "Cannot instantiate multi-device function with target device GPU:11");
740 }
741
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ErrorWhenListInput)742 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ErrorWhenListInput) {
743 const FunctionDef& def = test::function::FuncWithListInput();
744 Init({def});
745 FunctionLibraryRuntime::Handle handle;
746 Status status = proc_flr_->Instantiate(
747 "FuncWithListInput", test::function::Attrs({{"T", DT_FLOAT}, {"N", 1}}),
748 MakeOptions("CPU:0", {"CPU:0"}, {}), &handle);
749 ASSERT_TRUE(errors::IsInvalidArgument(status)) << "Actual status: " << status;
750 ASSERT_TRUE(absl::StrContains(
751 status.error_message(),
752 "FuncWithListInput has an input named \"x1\" that is a list of tensors"))
753 << "Actual error message: " << status.error_message();
754 }
755
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ErrorWhenListOutput)756 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ErrorWhenListOutput) {
757 const FunctionDef& def = test::function::FuncWithListOutput();
758 Init({def});
759 FunctionLibraryRuntime::Handle handle;
760 Status status = proc_flr_->Instantiate(
761 "FuncWithListOutput", test::function::Attrs({{"T", DT_FLOAT}, {"N", 1}}),
762 MakeOptions("CPU:0", {}, {"CPU:0"}), &handle);
763 ASSERT_TRUE(errors::IsInvalidArgument(status)) << "Actual status: " << status;
764 ASSERT_TRUE(absl::StrContains(
765 status.error_message(),
766 "FuncWithListOutput has an output named \"y\" that is a list of tensors"))
767 << "Actual error message: " << status.error_message();
768 }
769
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ExplicitMultiInputOutput)770 TEST_F(ProcessFunctionLibraryRuntimeTest,
771 MultiDevice_ExplicitMultiInputOutput) {
772 TestTwoDeviceInputOutput(
773 this, MakeOptions("CPU:0", {"CPU:0", "GPU:0"}, {"CPU:0", "GPU:0"}));
774 }
775
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_FlipInputs)776 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_FlipInputs) {
777 TestTwoDeviceInputOutput(
778 this, MakeOptions("CPU:0", {"GPU:0", "CPU:0"}, {"CPU:0", "GPU:0"}));
779 }
780
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_FlipOutputs)781 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_FlipOutputs) {
782 TestTwoDeviceInputOutput(
783 this, MakeOptions("CPU:0", {"CPU:0", "GPU:0"}, {"GPU:0", "CPU:0"}));
784 }
785
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_FlipBoth)786 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_FlipBoth) {
787 TestTwoDeviceInputOutput(
788 this, MakeOptions("CPU:0", {"GPU:0", "CPU:0"}, {"GPU:0", "CPU:0"}));
789 }
790
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_EmptyBodySwap)791 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_EmptyBodySwap) {
792 if (gpu_device_ == nullptr) {
793 GTEST_SKIP() << "No GPUs available";
794 }
795 FunctionLibraryRuntime::InstantiateOptions inst_opts =
796 MakeOptions("CPU:0", {"GPU:0", "CPU:0"}, {"CPU:0", "GPU:0"});
797 Init({test::function::EmptyBodySwap()});
798
799 Tensor x1 = CPUToGPU(test::AsTensor<float>({1, 2}));
800 Tensor x2 = test::AsTensor<float>({10, 20});
801 Tensor y1;
802 Tensor y2;
803 TF_CHECK_OK(Run("EmptyBodySwap", {}, {{"T", DT_FLOAT}}, inst_opts, {x1, x2},
804 {&y1, &y2}));
805
806 EXPECT_FALSE(IsCUDATensor(y1));
807 test::ExpectTensorEqual<float>(y1, test::AsTensor<float>({10, 20}));
808
809 EXPECT_TRUE(IsCUDATensor(y2));
810 y2 = GPUToCPU(y2);
811 test::ExpectTensorEqual<float>(y2, test::AsTensor<float>({1, 2}));
812 }
813
GetResourceHandle(const string & var_name,const string & container,const string & device_name)814 Tensor GetResourceHandle(const string& var_name, const string& container,
815 const string& device_name) {
816 ResourceHandle handle;
817 handle.set_device(device_name);
818 handle.set_container(container);
819 handle.set_name(var_name);
820 handle.set_hash_code(TypeIndex::Make<Var>().hash_code());
821 handle.set_maybe_type_name(TypeIndex::Make<Var>().name());
822 Tensor tensor(DT_RESOURCE, TensorShape({}));
823 tensor.scalar<ResourceHandle>()() = handle;
824 return tensor;
825 }
826
827 // Returns a function which adds two variables on different devices.
AddVarAcrossDevices()828 FunctionDef AddVarAcrossDevices() {
829 return FunctionDefHelper::Create(
830 // Name
831 "AddVarAcrossDevices",
832 // Args
833 {"x: resource"},
834 // Return values
835 {"y: float"},
836 // Attr def
837 {},
838 // Nodes
839 {
840 {{"read0"},
841 "ReadVariableOp",
842 {"x"},
843 {{"dtype", DT_FLOAT}},
844 {},
845 "/device:CPU:0"},
846 {{"read1"},
847 "ReadVariableOp",
848 {"x"},
849 {{"dtype", DT_FLOAT}},
850 {},
851 "/device:CPU:1"},
852 {{"add"},
853 "Add",
854 {"read0:value:0", "read1:value:0"},
855 {{"T", DT_FLOAT}},
856 {},
857 "/device:CPU:0"},
858 },
859 {{"y", "add:z:0"}});
860 }
861
862 // An implementation of FunctionArgsInterface for packed inputs.
863 class TestFunctionPackedArgs : public FunctionArgsInterface {
864 public:
TestFunctionPackedArgs(const int index,gtl::InlinedVector<TensorValue,4> && tensor_args)865 TestFunctionPackedArgs(const int index,
866 gtl::InlinedVector<TensorValue, 4>&& tensor_args) {
867 packed_args_.emplace(index, std::move(tensor_args));
868 }
869
~TestFunctionPackedArgs()870 ~TestFunctionPackedArgs() override{};
871
HasRemoteOrPackedInputs() const872 bool HasRemoteOrPackedInputs() const override { return true; };
873
GetLocalArg(const FunctionArgIndex & index,Tensor * val) const874 Status GetLocalArg(const FunctionArgIndex& index,
875 Tensor* val) const override {
876 *val = *packed_args_.at(index.index).at(index.sub_index).tensor;
877 return OkStatus();
878 };
879
GetLocalTensors() const880 std::vector<Tensor> GetLocalTensors() const override { return {}; }
881
882 private:
883 absl::flat_hash_map<int, gtl::InlinedVector<TensorValue, 4>> packed_args_;
884 };
885
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_CompositeDevice)886 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_CompositeDevice) {
887 Init({AddVarAcrossDevices()});
888 // Create two variables on two devices.
889 const Tensor initial_resource_value0 = test::AsTensor<float>({10, 20});
890 Var* resource0 = new Var(DT_FLOAT);
891 *resource0->tensor() = initial_resource_value0;
892 resource0->is_initialized = true;
893 const Tensor initial_resource_value1 = test::AsTensor<float>({30, 40});
894 Var* resource1 = new Var(DT_FLOAT);
895 *resource1->tensor() = initial_resource_value1;
896 resource1->is_initialized = true;
897 ResourceMgr* mgr0 = device0_->resource_manager();
898 ResourceMgr* mgr1 = device1_->resource_manager();
899 TF_ASSERT_OK(mgr0->Create(mgr0->default_container(), "var", resource0));
900 TF_ASSERT_OK(mgr1->Create(mgr1->default_container(), "var", resource1));
901
902 Tensor resource_handle0 =
903 GetResourceHandle("var", mgr0->default_container(), device0_->name());
904 Tensor resource_handle1 =
905 GetResourceHandle("var", mgr1->default_container(), device1_->name());
906
907 // Create a CompositeDevice
908 Status s;
909 std::unique_ptr<CompositeDevice> composite_device =
910 CompositeDevice::MakeDevice({device0_->name(), device1_->name()},
911 /*unique_device_id=*/0,
912 device_mgr_->HostCPU()->parsed_name(), &s);
913 TF_ASSERT_OK(s);
914 AddCompositeDevice(composite_device.get());
915
916 FunctionLibraryRuntime::Options opts;
917 FunctionLibraryRuntime::InstantiateOptions inst_opts =
918 MakeOptions("CPU:0", {"COMPOSITE:0"}, {"CPU:0"});
919 inst_opts.composite_devices[composite_device->name()] =
920 composite_device->underlying_devices();
921 inst_opts.input_resource_dtypes_and_shapes[0] = {
922 initial_resource_value0.dtype(), initial_resource_value0.shape()};
923
924 // Packed TensorHandle
925 {
926 gtl::InlinedVector<TensorValue, 4> handles;
927 handles.push_back(TensorValue(&resource_handle0));
928 handles.push_back(TensorValue(&resource_handle1));
929 TestFunctionPackedArgs args(0, std::move(handles));
930 FunctionRet ret;
931 TF_CHECK_OK(RunWithPackedArgs("AddVarAcrossDevices", opts,
932 {{"T", DT_FLOAT}}, inst_opts, args, {&ret}));
933 EXPECT_EQ(ret.index(), 0);
934 test::ExpectTensorEqual<float>(absl::get<Tensor>(ret),
935 test::AsTensor<float>({40, 60}));
936 }
937
938 // Packed Tensor
939 {
940 Tensor arg(DT_RESOURCE, TensorShape({2}));
941 arg.flat<ResourceHandle>()(0) = resource_handle0.scalar<ResourceHandle>()();
942 arg.flat<ResourceHandle>()(1) = resource_handle1.scalar<ResourceHandle>()();
943
944 Tensor ret;
945 TF_CHECK_OK(Run("AddVarAcrossDevices", opts, {{"T", DT_FLOAT}}, inst_opts,
946 {arg}, {&ret}));
947 test::ExpectTensorEqual<float>(ret, test::AsTensor<float>({40, 60}));
948 }
949 }
950
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ResourceOutput_GPU)951 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ResourceOutput_GPU) {
952 if (gpu_device_ == nullptr) {
953 GTEST_SKIP() << "No GPUs available";
954 }
955 FunctionLibraryRuntime::InstantiateOptions inst_opts =
956 MakeOptions("CPU:0", {"GPU:0", "GPU:0"}, {"GPU:0", "GPU:0"});
957 Init({test::function::ResourceOutput(),
958 test::function::ReadResourceVariable()});
959
960 // Make resource var
961 Tensor resource_value = CPUToGPU(test::AsTensor<float>({10, 20}));
962 Var* resource = new Var(DT_FLOAT);
963 *resource->tensor() = resource_value;
964 resource->is_initialized = true;
965 ResourceMgr* mgr = gpu_device_->resource_manager();
966 Status status = mgr->Create(mgr->default_container(), "my_gpu_var", resource);
967 ASSERT_TRUE(status.ok()) << status.error_message();
968
969 // Run the function taking a resource and outputting it
970 FunctionLibraryRuntime::Options opts;
971 Tensor x1 = CPUToGPU(test::AsTensor<float>({1, 2}));
972 Tensor x2 = GetResourceHandle("my_gpu_var", mgr->default_container(),
973 "/job:a/replica:0/task:0/device:GPU:0");
974 Tensor returned_handle;
975 Tensor y2;
976 TF_CHECK_OK(Run("ResourceOutput", opts, {{"T", DT_FLOAT}}, inst_opts,
977 {x1, x2}, {&returned_handle, &y2}));
978
979 EXPECT_FALSE(IsCUDATensor(returned_handle));
980 EXPECT_TRUE(IsCUDATensor(y2));
981 y2 = GPUToCPU(y2);
982 test::ExpectTensorEqual<float>(y2, test::AsTensor<float>({2, 4}));
983
984 // Read the variable using the handle returned from previous function to
985 // make sure the handle and read value is on the right device.
986 inst_opts = MakeOptions("GPU:0", {"GPU:0"}, {"GPU:0"});
987 Tensor read_resource;
988 TF_CHECK_OK(Run("ReadResourceVariable", opts, {{"T", DT_FLOAT}}, inst_opts,
989 {returned_handle}, {&read_resource}));
990 EXPECT_TRUE(IsCUDATensor(read_resource));
991 read_resource = GPUToCPU(read_resource);
992 test::ExpectTensorEqual<float>(read_resource,
993 test::AsTensor<float>({10, 20}));
994 }
995
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_PlacerError)996 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_PlacerError) {
997 if (gpu_device_ == nullptr) {
998 GTEST_SKIP() << "No GPUs available";
999 }
1000 // ResourceOutput forwards second input to first output. Both are resources.
1001 // Placer should not be able to place this graph because we ask it to place
1002 // second input on GPU but first output to CPU.
1003 FunctionLibraryRuntime::InstantiateOptions inst_opts =
1004 MakeOptions("CPU:0", {"GPU:0", "GPU:0"}, {"CPU:0", "GPU:0"});
1005 Init({test::function::ResourceOutput(),
1006 test::function::ReadResourceVariable()});
1007
1008 FunctionLibraryRuntime::Handle handle;
1009 Status status = proc_flr_->Instantiate(
1010 "ResourceOutput", test::function::Attrs({{"T", DT_FLOAT}}), inst_opts,
1011 &handle);
1012 ASSERT_TRUE(errors::IsInvalidArgument(status)) << "Actual status: " << status;
1013 ASSERT_TRUE(absl::StrContains(status.error_message(), "Cannot place"));
1014 }
1015
1016 REGISTER_OP("BrokenOp")
1017 .Input("in: T")
1018 .Output("out: T")
1019 .Attr("T: type")
1020 .SetShapeFn(shape_inference::UnknownShape);
1021 class BrokenOp : public OpKernel {
1022 public:
BrokenOp(OpKernelConstruction * ctx)1023 explicit BrokenOp(OpKernelConstruction* ctx) : OpKernel(ctx) {
1024 ctx->SetStatus(errors::Internal("I am broken"));
1025 }
1026
Compute(OpKernelContext * ctx)1027 void Compute(OpKernelContext* ctx) override {
1028 ctx->SetStatus(errors::Internal("I am broken"));
1029 }
1030 };
1031 REGISTER_KERNEL_BUILDER(Name("BrokenOp").Device(DEVICE_CPU), BrokenOp);
1032
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_CreateKernelsEagerly)1033 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_CreateKernelsEagerly) {
1034 auto T = DT_INT32;
1035 // The expected sequence of outputs from this function is [6, 4, 0, 1, ...].
1036 FunctionDef broken_func = FunctionDefHelper::Define(
1037 // Name
1038 "Broken",
1039 // Args
1040 {"x: int32"},
1041 // Return values
1042 {"y: int32"},
1043 // Attrs
1044 {},
1045 // Nodes
1046 {{{"y"}, "BrokenOp", {"x"}, {{"T", T}}}});
1047 Init({broken_func});
1048
1049 FunctionLibraryRuntime::InstantiateOptions inst_opts =
1050 MakeOptions("CPU:0", {"CPU:0"}, {"CPU:0"});
1051
1052 // Instantiating the broken function should work.
1053 FunctionLibraryRuntime::Handle handle;
1054 TF_CHECK_OK(Instantiate("Broken", {{"T", DT_INT32}}, inst_opts, &handle));
1055 TF_CHECK_OK(proc_flr_->ReleaseHandle(handle));
1056
1057 // Instantiating the broken function while creating kernels eagerly should
1058 // fail.
1059 inst_opts.create_kernels_eagerly = true;
1060 Status status = Instantiate("Broken", {{"T", DT_INT32}}, inst_opts, &handle);
1061 EXPECT_TRUE(errors::IsInternal(status));
1062 }
1063
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_StateHandle)1064 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_StateHandle) {
1065 auto T = DT_INT32;
1066 // The expected sequence of outputs from this function is [6, 4, 0, 1, ...].
1067 FunctionDef stateful_func = FunctionDefHelper::Define(
1068 // Name
1069 "RandomUniformWrapper",
1070 // Args
1071 {"x: resource"},
1072 // Return values
1073 {"y: int32"},
1074 // Attrs
1075 {},
1076 // Nodes
1077 {FunctionDefHelper::Const<int32>("shape", gtl::ArraySlice<int32>({1})),
1078 FunctionDefHelper::Const<int32>("minval", 0),
1079 {{"maxval"}, "ReadVariableOp", {"x"}, {{"dtype", T}}, {}},
1080 // A stateful node.
1081 {{"y"},
1082 "RandomUniformInt",
1083 {"shape", "minval", "maxval"},
1084 {{"seed", 37}, {"seed2", 48}, {"Tout", T}, {"T", T}}}});
1085 Init({stateful_func});
1086 if (gpu_device_ == nullptr) {
1087 GTEST_SKIP() << "No GPUs available";
1088 }
1089
1090 // Make resource variables.
1091 ResourceMgr* mgr = gpu_device_->resource_manager();
1092 Tensor resource_value = CPUToGPU(test::AsScalar<int>(10));
1093 Var* resource = new Var(T);
1094 *resource->tensor() = resource_value;
1095 resource->is_initialized = true;
1096 Status status = mgr->Create(mgr->default_container(), "my_gpu_var", resource);
1097 ASSERT_TRUE(status.ok()) << status.error_message();
1098
1099 Tensor x = GetResourceHandle("my_gpu_var", mgr->default_container(),
1100 "/job:a/replica:0/task:0/device:GPU:0");
1101 Tensor y;
1102
1103 FunctionLibraryRuntime::InstantiateOptions inst_opts =
1104 MakeOptions("CPU:0", {"GPU:0"}, {"CPU:0"});
1105
1106 // Instantiate the function with no state handle.
1107 FunctionLibraryRuntime::Handle handle;
1108 TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}}, inst_opts,
1109 &handle));
1110 for (auto expected : {6, 4}) {
1111 TF_CHECK_OK(RunInstantiated(handle, {}, {x}, {&y}));
1112 test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
1113 }
1114
1115 // Instantiating the function again with no state handle should result in the
1116 // same handle.
1117 FunctionLibraryRuntime::Handle other_handle;
1118 TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}}, inst_opts,
1119 &other_handle));
1120 EXPECT_EQ(handle, other_handle);
1121 // Running the function should yield continuation of the same sequence.
1122 for (auto expected : {0, 1}) {
1123 TF_CHECK_OK(RunInstantiated(other_handle, {}, {x}, {&y}));
1124 test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
1125 }
1126
1127 // Instantiating the function with a state handle should result in a different
1128 // handle.
1129 inst_opts.state_handle = "handle_1";
1130 TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}}, inst_opts,
1131 &other_handle));
1132 EXPECT_NE(handle, other_handle);
1133 // Running the function should yield the original sequeunce.
1134 for (auto expected : {6, 4, 0, 1}) {
1135 TF_CHECK_OK(RunInstantiated(other_handle, {}, {x}, {&y}));
1136 test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
1137 }
1138
1139 // Instantiating the function with a different state handle should result in a
1140 // different handle.
1141 inst_opts.state_handle = "handle_2";
1142 TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}}, inst_opts,
1143 &other_handle));
1144 EXPECT_NE(handle, other_handle);
1145 // Running the function should yield the original sequeunce.
1146 for (auto expected : {6, 4, 0, 1}) {
1147 TF_CHECK_OK(RunInstantiated(other_handle, {}, {x}, {&y}));
1148 test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
1149 }
1150
1151 // Repeatedly instantiating a function and releasing its handle will result in
1152 // repeating the original sequence.
1153 inst_opts.state_handle = "handle_3";
1154 for (int i = 0; i < 2; ++i) {
1155 TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}},
1156 inst_opts, &other_handle));
1157 EXPECT_NE(handle, other_handle);
1158 // Running the function should yield the original sequeunce.
1159 for (auto expected : {6, 4, 0, 1}) {
1160 TF_CHECK_OK(RunInstantiated(other_handle, {}, {x}, {&y}));
1161 test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
1162 }
1163 TF_CHECK_OK(proc_flr_->ReleaseHandle(other_handle));
1164 }
1165 }
1166
1167 REGISTER_OP("SessionMetadataReader")
1168 .Input("x: int64")
1169 .Output("y: string")
1170 .SetIsStateful()
1171 .Doc(R"doc(SessionMetadataReader returns the session metadata.
1172
1173 x: int64
1174 y: string
1175 )doc");
1176
1177 class SessionMetadataReaderOp : public OpKernel {
1178 public:
SessionMetadataReaderOp(OpKernelConstruction * ctx)1179 explicit SessionMetadataReaderOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}
Compute(OpKernelContext * ctx)1180 void Compute(OpKernelContext* ctx) override {
1181 Tensor* out_tensor = nullptr;
1182 OP_REQUIRES_OK(ctx,
1183 ctx->allocate_output("y", TensorShape({}), &out_tensor));
1184 if (ctx->session_metadata() != nullptr) {
1185 out_tensor->scalar<tstring>()() = ctx->session_metadata()->DebugString();
1186 } else {
1187 out_tensor->scalar<tstring>()() = "";
1188 }
1189 }
1190 };
1191 REGISTER_KERNEL_BUILDER(Name("SessionMetadataReader").Device(DEVICE_CPU),
1192 SessionMetadataReaderOp);
1193
SessionMetadataReaderOpFn()1194 FunctionDef SessionMetadataReaderOpFn() {
1195 return FunctionDefHelper::Define(
1196 // Name
1197 "SessionMetadataReaderFn",
1198 // Args
1199 {"x: int64"},
1200 // Return values
1201 {"y: string"},
1202 // Attr def
1203 {},
1204 // Nodes
1205 {{{"y"}, "SessionMetadataReader", {"x"}, {}}});
1206 }
1207
TEST_F(ProcessFunctionLibraryRuntimeTest,SessionMetadataAbsent)1208 TEST_F(ProcessFunctionLibraryRuntimeTest, SessionMetadataAbsent) {
1209 Init({SessionMetadataReaderOpFn()}, /*session_metadata=*/nullptr);
1210 FunctionLibraryRuntime::Options opts;
1211 opts.source_device = "/job:a/replica:0/task:0/cpu:0";
1212 opts.remote_execution = true;
1213 FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
1214 instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
1215 const auto x = test::AsTensor<int64_t>({17});
1216 Tensor y;
1217 TF_CHECK_OK(
1218 Run("SessionMetadataReaderFn", opts, {}, instantiate_opts, {x}, {&y}));
1219 EXPECT_EQ("", y.scalar<tstring>()());
1220 }
1221
TEST_F(ProcessFunctionLibraryRuntimeTest,SessionMetadataPresent)1222 TEST_F(ProcessFunctionLibraryRuntimeTest, SessionMetadataPresent) {
1223 const SessionMetadata session_metadata = GenerateSessionMetadata();
1224 Init({SessionMetadataReaderOpFn()}, &session_metadata);
1225 FunctionLibraryRuntime::Options opts;
1226 opts.source_device = "/job:a/replica:0/task:0/cpu:0";
1227 opts.remote_execution = true;
1228 FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
1229 instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
1230 const auto x = test::AsTensor<int64_t>({17});
1231 Tensor y;
1232 TF_CHECK_OK(
1233 Run("SessionMetadataReaderFn", opts, {}, instantiate_opts, {x}, {&y}));
1234 SessionMetadata read_metadata;
1235 ASSERT_TRUE(protobuf::TextFormat::ParseFromString(y.scalar<tstring>()(),
1236 &read_metadata));
1237 EXPECT_EQ(session_metadata.name(), read_metadata.name());
1238 EXPECT_EQ(session_metadata.version(), read_metadata.version());
1239 }
1240
TEST_F(ProcessFunctionLibraryRuntimeTest,CompositeDevicesAfterCloning)1241 TEST_F(ProcessFunctionLibraryRuntimeTest, CompositeDevicesAfterCloning) {
1242 Init({AddVarAcrossDevices()});
1243
1244 Status s;
1245 std::unique_ptr<CompositeDevice> composite_device =
1246 CompositeDevice::MakeDevice({device0_->name(), device1_->name()},
1247 /*unique_device_id=*/0,
1248 device_mgr_->HostCPU()->parsed_name(), &s);
1249 TF_ASSERT_OK(s);
1250 AddCompositeDevice(composite_device.get());
1251
1252 auto* flr = proc_flr_->GetFLR("/job:a/replica:0/task:0/cpu:0");
1253 ASSERT_NE(nullptr, flr);
1254 std::unique_ptr<FunctionLibraryDefinition> cloned_lib_def;
1255 std::unique_ptr<ProcessFunctionLibraryRuntime> cloned_proc_flr;
1256 FunctionLibraryRuntime* cloned_flr;
1257 TF_ASSERT_OK(flr->Clone(&cloned_lib_def, &cloned_proc_flr, &cloned_flr));
1258 EXPECT_EQ(
1259 cloned_proc_flr->device_set()->FindDeviceByName(composite_device->name()),
1260 composite_device.get());
1261 }
1262
TEST_F(ProcessFunctionLibraryRuntimeTest,SessionMetadataPresentAfterCloning)1263 TEST_F(ProcessFunctionLibraryRuntimeTest, SessionMetadataPresentAfterCloning) {
1264 const SessionMetadata session_metadata = GenerateSessionMetadata();
1265 Init({SessionMetadataReaderOpFn()}, &session_metadata);
1266 auto* flr = proc_flr_->GetFLR("/job:a/replica:0/task:0/cpu:0");
1267 ASSERT_NE(nullptr, flr);
1268 std::unique_ptr<FunctionLibraryDefinition> cloned_lib_def;
1269 std::unique_ptr<ProcessFunctionLibraryRuntime> cloned_proc_flr;
1270 FunctionLibraryRuntime* cloned_flr;
1271 TF_ASSERT_OK(flr->Clone(&cloned_lib_def, &cloned_proc_flr, &cloned_flr));
1272 FunctionLibraryRuntime::Options opts;
1273 opts.source_device = "/job:a/replica:0/task:0/cpu:0";
1274 opts.remote_execution = true;
1275 FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
1276 instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
1277 const auto x = test::AsTensor<int64_t>({17});
1278 Tensor y;
1279 Status s = RunWithRuntime<std::vector<Tensor>, Tensor>(
1280 "SessionMetadataReaderFn", opts, {}, instantiate_opts, {x}, {&y},
1281 cloned_proc_flr.get());
1282 TF_CHECK_OK(s);
1283 SessionMetadata read_metadata;
1284 ASSERT_TRUE(protobuf::TextFormat::ParseFromString(y.scalar<tstring>()(),
1285 &read_metadata));
1286 EXPECT_EQ(session_metadata.name(), read_metadata.name());
1287 EXPECT_EQ(session_metadata.version(), read_metadata.version());
1288 }
1289
TEST_F(ProcessFunctionLibraryRuntimeTest,SimpleGraphAllowsSync)1290 TEST_F(ProcessFunctionLibraryRuntimeTest, SimpleGraphAllowsSync) {
1291 auto async_safe =
1292 metrics::TestDelta("subgraph_async_summary", "safe_for_sync");
1293 FunctionLibraryRuntime::InstantiateOptions opts =
1294 MakeOptions("CPU:0", {}, {});
1295 opts.allow_small_function_optimizations = true;
1296 TestInstantiateSimpleFunction(this, opts);
1297 EXPECT_GT(async_safe.Get(), 0);
1298 }
1299
TEST_F(ProcessFunctionLibraryRuntimeTest,UnsafeOpRequiresAsync)1300 TEST_F(ProcessFunctionLibraryRuntimeTest, UnsafeOpRequiresAsync) {
1301 auto async_safe =
1302 metrics::TestDelta("subgraph_async_summary", "safe_for_sync");
1303 auto async_unsafe_op =
1304 metrics::TestDelta("subgraph_async_summary", "unsafe_op");
1305 FunctionLibraryRuntime::InstantiateOptions opts =
1306 MakeOptions("CPU:0", {"CPU:0"}, {"CPU:0"});
1307 opts.allow_small_function_optimizations = true;
1308 TestControlFlow(this, opts);
1309 EXPECT_EQ(async_safe.Get(), 0);
1310 EXPECT_GT(async_unsafe_op.Get(), 0);
1311 }
1312
TEST_F(ProcessFunctionLibraryRuntimeTest,PartitionedGraphRequiresAsync)1313 TEST_F(ProcessFunctionLibraryRuntimeTest, PartitionedGraphRequiresAsync) {
1314 if (gpu_device_ == nullptr) {
1315 GTEST_SKIP() << "No GPUs available";
1316 }
1317 auto async_send_only =
1318 metrics::TestDelta("subgraph_async_summary", "send_only");
1319 auto async_recv_only =
1320 metrics::TestDelta("subgraph_async_summary", "recv_only");
1321 FunctionLibraryRuntime::InstantiateOptions opts =
1322 MakeOptions("CPU:0", {"CPU:0"}, {"CPU:0", "GPU:0"});
1323 opts.allow_small_function_optimizations = true;
1324 TestTwoDeviceMult(this, opts);
1325 EXPECT_GT(async_send_only.Get(), 0);
1326 EXPECT_GT(async_recv_only.Get(), 0);
1327 }
1328
1329 } // anonymous namespace
1330 } // namespace tensorflow
1331