1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/core/common_runtime/process_function_library_runtime.h"
16 
17 #include <memory>
18 #include <unordered_map>
19 #include <vector>
20 
21 #include "tensorflow/core/common_runtime/composite_device.h"
22 #include "tensorflow/core/common_runtime/device_factory.h"
23 #include "tensorflow/core/common_runtime/device_mgr.h"
24 #include "tensorflow/core/common_runtime/function_testlib.h"
25 #include "tensorflow/core/common_runtime/rendezvous_mgr.h"
26 #include "tensorflow/core/framework/function.h"
27 #include "tensorflow/core/framework/function_testlib.h"
28 #include "tensorflow/core/framework/metrics.h"
29 #include "tensorflow/core/framework/op_kernel.h"
30 #include "tensorflow/core/framework/resource_var.h"
31 #include "tensorflow/core/framework/tensor_testutil.h"
32 #include "tensorflow/core/framework/type_index.h"
33 #include "tensorflow/core/framework/types.pb.h"
34 #include "tensorflow/core/lib/core/errors.h"
35 #include "tensorflow/core/lib/core/status_test_util.h"
36 #include "tensorflow/core/lib/core/threadpool.h"
37 #include "tensorflow/core/lib/strings/str_util.h"
38 #include "tensorflow/core/platform/test.h"
39 #include "tensorflow/core/protobuf/config.pb.h"
40 #include "tensorflow/core/public/session_options.h"
41 #include "tensorflow/core/public/version.h"
42 
43 #if GOOGLE_CUDA
44 #include "third_party/gpus/cuda/include/cuda.h"
45 #include "third_party/gpus/cuda/include/cuda_runtime_api.h"
46 #elif TENSORFLOW_USE_ROCM
47 #include "rocm/include/hip/hip_runtime.h"
48 #endif  // GOOGLE_CUDA
49 
50 namespace tensorflow {
51 namespace {
52 
53 class TestClusterFLR : public DistributedFunctionLibraryRuntime {
54  public:
TestClusterFLR(DeviceMgr * device_mgr)55   explicit TestClusterFLR(DeviceMgr* device_mgr) : device_mgr_(device_mgr) {}
56 
Instantiate(const string & function_name,const FunctionLibraryDefinition & lib_def,AttrSlice attrs,const FunctionLibraryRuntime::InstantiateOptions & options,FunctionLibraryRuntime::LocalHandle * handle,FunctionLibraryRuntime::DoneCallback done)57   void Instantiate(const string& function_name,
58                    const FunctionLibraryDefinition& lib_def, AttrSlice attrs,
59                    const FunctionLibraryRuntime::InstantiateOptions& options,
60                    FunctionLibraryRuntime::LocalHandle* handle,
61                    FunctionLibraryRuntime::DoneCallback done) override {
62     {
63       mutex_lock l(mu_);
64       *handle = next_handle_;
65       next_handle_++;
66     }
67     done(OkStatus());
68   }
69 
Run(const FunctionLibraryRuntime::Options & opts,FunctionLibraryRuntime::LocalHandle handle,gtl::ArraySlice<Tensor> args,std::vector<Tensor> * rets,FunctionLibraryRuntime::DoneCallback done)70   void Run(const FunctionLibraryRuntime::Options& opts,
71            FunctionLibraryRuntime::LocalHandle handle,
72            gtl::ArraySlice<Tensor> args, std::vector<Tensor>* rets,
73            FunctionLibraryRuntime::DoneCallback done) override {}
74 
Run(const FunctionLibraryRuntime::Options & opts,FunctionLibraryRuntime::LocalHandle handle,gtl::ArraySlice<FunctionArg> args,std::vector<FunctionRet> * rets,FunctionLibraryRuntime::DoneCallback done)75   void Run(const FunctionLibraryRuntime::Options& opts,
76            FunctionLibraryRuntime::LocalHandle handle,
77            gtl::ArraySlice<FunctionArg> args, std::vector<FunctionRet>* rets,
78            FunctionLibraryRuntime::DoneCallback done) override {}
79 
CleanUp(uint64 step_id,FunctionLibraryRuntime::LocalHandle handle,FunctionLibraryRuntime::DoneCallback done)80   void CleanUp(uint64 step_id, FunctionLibraryRuntime::LocalHandle handle,
81                FunctionLibraryRuntime::DoneCallback done) override {}
82 
remote_device_mgr() const83   DeviceMgr* remote_device_mgr() const override { return device_mgr_; }
84 
85  private:
86   mutex mu_;
87   int next_handle_ TF_GUARDED_BY(mu_) = 0;
88   DeviceMgr* device_mgr_;
89 };
90 
GenerateSessionMetadata()91 SessionMetadata GenerateSessionMetadata() {
92   SessionMetadata session_metadata;
93   session_metadata.set_name("name");
94   session_metadata.set_version(42);
95   return session_metadata;
96 }
97 
98 // TODO(b/128707168): Tests requiring a GPU device are currently always skipped
99 // because the check for whether a GPU device is present happens before the GPU
100 // device is set up.
101 class ProcessFunctionLibraryRuntimeTest : public ::testing::Test {
102  public:
ProcessFunctionLibraryRuntimeTest()103   ProcessFunctionLibraryRuntimeTest() {
104     SessionOptions options;
105     auto* device_count = options.config.mutable_device_count();
106     device_count->insert({"CPU", 3});
107     std::vector<std::unique_ptr<Device>> created_devices;
108     TF_CHECK_OK(DeviceFactory::AddDevices(options, "/job:a/replica:0/task:0",
109                                           &created_devices));
110     // Do not add CPU:2 to device manager. Used for removed device testing.
111     device2_ = std::move(created_devices[2]);
112     created_devices.erase(created_devices.begin() + 2);
113 
114     device_mgr_ = std::make_unique<DynamicDeviceMgr>();
115     TF_CHECK_OK(device_mgr_->AddDevices(std::move(created_devices)));
116     TF_CHECK_OK(device_mgr_->LookupDevice(
117         "/job:a/replica:0/task:0/device:CPU:0", &device0_));
118     TF_CHECK_OK(device_mgr_->LookupDevice(
119         "/job:a/replica:0/task:0/device:CPU:1", &device1_));
120     Device* device2_ptr = nullptr;
121     EXPECT_NE(
122         error::OK,
123         device_mgr_
124             ->LookupDevice("/job:a/replica:0/task:0/device:CPU:2", &device2_ptr)
125             .code());
126     // If no GPU is available, gpu_device_ will remain nullptr.
127     Status status = device_mgr_->LookupDevice(
128         "/job:a/replica:0/task:0/device:GPU:0", &gpu_device_);
129     if (!status.ok()) {
130       CHECK_EQ(nullptr, gpu_device_);
131     }
132   }
133 
Init(const std::vector<FunctionDef> & flib,const SessionMetadata * session_metadata=nullptr)134   void Init(const std::vector<FunctionDef>& flib,
135             const SessionMetadata* session_metadata = nullptr) {
136     FunctionDefLibrary proto;
137     for (const auto& fdef : flib) *(proto.add_function()) = fdef;
138     lib_def_.reset(new FunctionLibraryDefinition(OpRegistry::Global(), proto));
139     OptimizerOptions opts;
140     cluster_flr_.reset(new TestClusterFLR(device_mgr_.get()));
141     proc_flr_.reset(new ProcessFunctionLibraryRuntime(
142         device_mgr_.get(), Env::Default(), /*config=*/nullptr,
143         TF_GRAPH_DEF_VERSION, lib_def_.get(), opts,
144         /*thread_pool=*/nullptr, cluster_flr_.get(), session_metadata,
145         Rendezvous::Factory{
146             [this](const int64_t step_id, const DeviceMgr* device_mgr,
147                    Rendezvous** r) {
148               *r = new IntraProcessRendezvous(device_mgr);
149               if (rendezvous_ref_counts_.find(step_id) !=
150                   rendezvous_ref_counts_.end()) {
151                 rendezvous_ref_counts_[step_id]++;
152               } else {
153                 rendezvous_ref_counts_[step_id] = 1;
154               }
155               return OkStatus();
156             },
157             [this](const int64_t step_id) {
158               CHECK(rendezvous_ref_counts_.find(step_id) !=
159                     rendezvous_ref_counts_.end());
160               rendezvous_ref_counts_[step_id]--;
161               return OkStatus();
162             }}));
163   }
164 
AddCompositeDevice(CompositeDevice * d)165   void AddCompositeDevice(CompositeDevice* d) {
166     proc_flr_->AddCompositeDevice(d);
167   }
168 
Instantiate(const string & name,test::function::Attrs attrs,const FunctionLibraryRuntime::InstantiateOptions & instantiate_opts,FunctionLibraryRuntime::Handle * handle)169   Status Instantiate(
170       const string& name, test::function::Attrs attrs,
171       const FunctionLibraryRuntime::InstantiateOptions& instantiate_opts,
172       FunctionLibraryRuntime::Handle* handle) {
173     return proc_flr_->Instantiate(name, attrs, instantiate_opts, handle);
174   }
175 
GPUToCPU(const Tensor & device_tensor)176   Tensor GPUToCPU(const Tensor& device_tensor) {
177 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
178     CHECK(gpu_device_);
179     CHECK(gpu_device_->tensorflow_accelerator_device_info() != nullptr);
180     DeviceContext* device_context =
181         gpu_device_->tensorflow_accelerator_device_info()->default_context;
182 
183     Tensor cpu_tensor(device_tensor.dtype(), device_tensor.shape());
184     CHECK(device_context
185               ->CopyDeviceTensorToCPUSync(&device_tensor, "", gpu_device_,
186                                           &cpu_tensor)
187               .ok());
188     return cpu_tensor;
189 #else
190     CHECK(false);
191 #endif  // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
192   }
193 
CPUToGPU(const Tensor & cpu_tensor)194   Tensor CPUToGPU(const Tensor& cpu_tensor) {
195 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
196     CHECK(gpu_device_);
197     CHECK(gpu_device_->tensorflow_accelerator_device_info() != nullptr);
198     DeviceContext* device_context =
199         gpu_device_->tensorflow_accelerator_device_info()->default_context;
200 
201     Tensor device_tensor(gpu_device_->GetAllocator({}), cpu_tensor.dtype(),
202                          cpu_tensor.shape(), {});
203     CHECK(device_context
204               ->CopyCPUTensorToDeviceSync(&cpu_tensor, gpu_device_,
205                                           &device_tensor)
206               .ok());
207     return device_tensor;
208 #else
209     CHECK(false);
210 #endif  // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
211   }
212 
213   template <typename T, typename K>
RunWithRuntime(const string & name,FunctionLibraryRuntime::Options opts,test::function::Attrs attrs,const FunctionLibraryRuntime::InstantiateOptions & instantiate_opts,const T & args,std::vector<K * > rets,ProcessFunctionLibraryRuntime * pflr)214   Status RunWithRuntime(
215       const string& name, FunctionLibraryRuntime::Options opts,
216       test::function::Attrs attrs,
217       const FunctionLibraryRuntime::InstantiateOptions& instantiate_opts,
218       const T& args, std::vector<K*> rets,
219       ProcessFunctionLibraryRuntime* pflr) {
220     FunctionLibraryRuntime::Handle handle;
221     Status status = pflr->Instantiate(name, attrs, instantiate_opts, &handle);
222     if (!status.ok()) {
223       return status;
224     }
225     bool is_cross_process = false;
226     TF_CHECK_OK(pflr->IsCrossProcess(handle, &is_cross_process));
227     EXPECT_FALSE(is_cross_process);
228 
229     std::function<void(std::function<void()>)> runner =
230         [](std::function<void()> fn) {
231           test::function::FunctionTestSchedClosure(fn);
232         };
233     Notification done;
234     opts.runner = &runner;
235     std::vector<K> out;
236     pflr->Run(opts, handle, args, &out, [&status, &done](const Status& s) {
237       status = s;
238       done.Notify();
239     });
240     done.WaitForNotification();
241     if (!status.ok()) {
242       return status;
243     }
244     CHECK_EQ(rets.size(), out.size());
245     for (size_t i = 0; i < rets.size(); ++i) {
246       *rets[i] = out[i];
247     }
248 
249     // Release the handle and then try running the function. It shouldn't
250     // succeed.
251     status = pflr->ReleaseHandle(handle);
252     if (!status.ok()) {
253       return status;
254     }
255     Notification done2;
256     pflr->Run(opts, handle, args, &out, [&status, &done2](const Status& s) {
257       status = s;
258       done2.Notify();
259     });
260     done2.WaitForNotification();
261     EXPECT_TRUE(errors::IsNotFound(status)) << "Actual status: " << status;
262     EXPECT_TRUE(absl::StrContains(status.error_message(), "not found."));
263 
264     return OkStatus();
265   }
266 
Run(const string & name,FunctionLibraryRuntime::Options opts,test::function::Attrs attrs,const FunctionLibraryRuntime::InstantiateOptions & instantiate_opts,const std::vector<Tensor> & args,std::vector<Tensor * > rets,ProcessFunctionLibraryRuntime * pflr=nullptr)267   Status Run(const string& name, FunctionLibraryRuntime::Options opts,
268              test::function::Attrs attrs,
269              const FunctionLibraryRuntime::InstantiateOptions& instantiate_opts,
270              const std::vector<Tensor>& args, std::vector<Tensor*> rets,
271              ProcessFunctionLibraryRuntime* pflr = nullptr) {
272     return RunWithRuntime<std::vector<Tensor>, Tensor>(
273         name, opts, attrs, instantiate_opts, args, rets, proc_flr_.get());
274   }
275 
RunWithPackedArgs(const string & name,FunctionLibraryRuntime::Options opts,test::function::Attrs attrs,const FunctionLibraryRuntime::InstantiateOptions & instantiate_opts,const FunctionArgsInterface & args,std::vector<FunctionRet * > rets,ProcessFunctionLibraryRuntime * pflr=nullptr)276   Status RunWithPackedArgs(
277       const string& name, FunctionLibraryRuntime::Options opts,
278       test::function::Attrs attrs,
279       const FunctionLibraryRuntime::InstantiateOptions& instantiate_opts,
280       const FunctionArgsInterface& args, std::vector<FunctionRet*> rets,
281       ProcessFunctionLibraryRuntime* pflr = nullptr) {
282     return RunWithRuntime<FunctionArgsInterface, FunctionRet>(
283         name, opts, attrs, instantiate_opts, args, rets, proc_flr_.get());
284   }
285 
RunInstantiated(FunctionLibraryRuntime::Handle handle,FunctionLibraryRuntime::Options opts,const std::vector<Tensor> & args,std::vector<Tensor * > rets)286   Status RunInstantiated(FunctionLibraryRuntime::Handle handle,
287                          FunctionLibraryRuntime::Options opts,
288                          const std::vector<Tensor>& args,
289                          std::vector<Tensor*> rets) {
290     std::function<void(std::function<void()>)> runner =
291         [](std::function<void()> fn) {
292           test::function::FunctionTestSchedClosure(fn);
293         };
294 
295     opts.runner = &runner;
296     Status status;
297     Notification done;
298     std::vector<Tensor> out;
299     proc_flr_->Run(opts, handle, args, &out, [&status, &done](const Status& s) {
300       status = s;
301       done.Notify();
302     });
303     done.WaitForNotification();
304     if (!status.ok()) {
305       return status;
306     }
307     CHECK_EQ(rets.size(), out.size());
308     for (size_t i = 0; i < rets.size(); ++i) {
309       *rets[i] = out[i];
310     }
311     return OkStatus();
312   }
313 
314   std::unique_ptr<DynamicDeviceMgr> device_mgr_;
315   Device* device0_ = nullptr;  // Not owned. (Owned by device_mgr_.)
316   Device* device1_ = nullptr;  // Not owned. (Owned by device_mgr_.)
317   std::unique_ptr<Device> device2_;
318   // Remains as nullptr if no GPU is available.
319   Device* gpu_device_ = nullptr;  // Not owned. (Owned by device_mgr_.)
320   std::unique_ptr<FunctionLibraryDefinition> lib_def_;
321   std::unique_ptr<TestClusterFLR> cluster_flr_;
322   std::unique_ptr<ProcessFunctionLibraryRuntime> proc_flr_;
323 
324   // To ensure that we are cleaning up the rendezvous properly.
325   std::unordered_map<int64_t, int> rendezvous_ref_counts_;
326 };
327 
TEST_F(ProcessFunctionLibraryRuntimeTest,GetFLRNull)328 TEST_F(ProcessFunctionLibraryRuntimeTest, GetFLRNull) {
329   FunctionDefLibrary proto;
330   std::unique_ptr<FunctionLibraryDefinition> lib_def(
331       new FunctionLibraryDefinition(OpRegistry::Global(), proto));
332   OptimizerOptions opts;
333   std::unique_ptr<ProcessFunctionLibraryRuntime> proc_flr(
334       new ProcessFunctionLibraryRuntime(
335           nullptr /* device_mgr */, Env::Default(), /*config=*/nullptr,
336           TF_GRAPH_DEF_VERSION, lib_def.get(), opts));
337   FunctionLibraryRuntime* flr =
338       proc_flr->GetFLR(ProcessFunctionLibraryRuntime::kDefaultFLRDevice);
339   EXPECT_NE(flr, nullptr);
340 }
341 
TEST_F(ProcessFunctionLibraryRuntimeTest,DeviceSet)342 TEST_F(ProcessFunctionLibraryRuntimeTest, DeviceSet) {
343   FunctionDefLibrary proto;
344   std::unique_ptr<FunctionLibraryDefinition> lib_def(
345       new FunctionLibraryDefinition(OpRegistry::Global(), proto));
346   OptimizerOptions opts;
347   std::vector<std::unique_ptr<Device>> devices;
348   devices.emplace_back(std::move(device2_));
349   auto mgr = std::make_unique<DynamicDeviceMgr>();
350   TF_CHECK_OK(mgr.get()->AddDevices(std::move(devices)));
351 
352   std::unique_ptr<ProcessFunctionLibraryRuntime> proc_flr(
353       new ProcessFunctionLibraryRuntime(
354           /*device_mgr=*/device_mgr_.get(), Env::Default(),
355           /*config=*/nullptr, TF_GRAPH_DEF_VERSION, lib_def.get(), opts,
356           /*thread_pool=*/nullptr));
357   EXPECT_NE(nullptr, proc_flr->device_set()->FindDeviceByName(
358                          "/job:a/replica:0/task:0/device:CPU:0"));
359   EXPECT_NE(nullptr, proc_flr->device_set()->FindDeviceByName(
360                          "/job:a/replica:0/task:0/device:CPU:1"));
361 
362   cluster_flr_.reset(new TestClusterFLR(mgr.get()));
363   proc_flr.reset(new ProcessFunctionLibraryRuntime(
364       /*device_mgr=*/device_mgr_.get(), Env::Default(),
365       /*config=*/nullptr, TF_GRAPH_DEF_VERSION, lib_def.get(), opts,
366       /*thread_pool=*/nullptr, /*parent_=*/cluster_flr_.get()));
367   EXPECT_NE(nullptr, proc_flr->device_set()->FindDeviceByName(
368                          "/job:a/replica:0/task:0/device:CPU:2"));
369 }
370 
TEST_F(ProcessFunctionLibraryRuntimeTest,Basic)371 TEST_F(ProcessFunctionLibraryRuntimeTest, Basic) {
372   Init({});
373   FunctionLibraryRuntime* flr =
374       proc_flr_->GetFLR("/job:a/replica:0/task:0/cpu:0");
375   EXPECT_NE(flr, nullptr);
376   EXPECT_EQ(flr->device(), device0_);
377   flr = proc_flr_->GetFLR("/job:a/replica:0/task:0/device:CPU:0");
378   EXPECT_NE(flr, nullptr);
379   EXPECT_EQ(flr->device(), device0_);
380   flr = proc_flr_->GetFLR("/device:CPU:0");
381   EXPECT_NE(flr, nullptr);
382   EXPECT_EQ(flr->device(), device0_);
383   flr = proc_flr_->GetFLR("/job:a/replica:0/task:0/cpu:1");
384   EXPECT_NE(flr, nullptr);
385   EXPECT_EQ(flr->device(), device1_);
386   flr = proc_flr_->GetFLR("abc");
387   EXPECT_EQ(flr, nullptr);
388 }
389 
TEST_F(ProcessFunctionLibraryRuntimeTest,GetDeviceIncarnation)390 TEST_F(ProcessFunctionLibraryRuntimeTest, GetDeviceIncarnation) {
391   Init({});
392   int64_t incarnation;
393   TF_EXPECT_OK(proc_flr_->GetDeviceIncarnation("/job:a/replica:0/task:0/cpu:1",
394                                                &incarnation));
395   // Incarnation is a random number other than 0.
396   EXPECT_NE(incarnation, 0);
397   Status s = proc_flr_->GetDeviceIncarnation("/job:a/replica:0/task:0/cpu:2",
398                                              &incarnation);
399   EXPECT_EQ(s.code(), error::INVALID_ARGUMENT);
400 }
401 
TEST_F(ProcessFunctionLibraryRuntimeTest,SingleCall)402 TEST_F(ProcessFunctionLibraryRuntimeTest, SingleCall) {
403   Init({test::function::XTimesTwo()});
404   FunctionLibraryRuntime::Options opts;
405   opts.source_device = "/job:a/replica:0/task:0/cpu:0";
406   opts.remote_execution = true;
407   FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
408   instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
409   auto x = test::AsTensor<float>({1, 2, 3, 4});
410   Tensor y;
411   TF_CHECK_OK(
412       Run("XTimesTwo", opts, {{"T", DT_FLOAT}}, instantiate_opts, {x}, {&y}));
413   test::ExpectTensorEqual<float>(y, test::AsTensor<float>({2, 4, 6, 8}));
414 }
415 
TEST_F(ProcessFunctionLibraryRuntimeTest,SingleCallFindDevice)416 TEST_F(ProcessFunctionLibraryRuntimeTest, SingleCallFindDevice) {
417   Init({test::function::FindDevice()});
418   FunctionLibraryRuntime::Options opts;
419   opts.source_device = "/job:a/replica:0/task:0/cpu:0";
420   opts.remote_execution = true;
421   FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
422   instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
423   Tensor y;
424   TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts, {}, {&y}));
425   test::ExpectTensorEqual<tstring>(
426       y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:0"},
427                                  TensorShape({})));
428   EXPECT_EQ(1, rendezvous_ref_counts_.size());
429   EXPECT_EQ(opts.step_id, rendezvous_ref_counts_.begin()->first);
430   EXPECT_EQ(0, rendezvous_ref_counts_.begin()->second);
431 }
432 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultipleCallsSameDeviceXTimes)433 TEST_F(ProcessFunctionLibraryRuntimeTest, MultipleCallsSameDeviceXTimes) {
434   Init({test::function::XTimesTwo(), test::function::XTimesFour()});
435   auto x = test::AsTensor<float>({1, 2, 3, 4});
436   FunctionLibraryRuntime::Options opts;
437   opts.source_device = "/job:a/replica:0/task:0/cpu:0";
438   opts.remote_execution = true;
439   FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
440   instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
441   Tensor y;
442   TF_CHECK_OK(
443       Run("XTimesTwo", opts, {{"T", DT_FLOAT}}, instantiate_opts, {x}, {&y}));
444   test::ExpectTensorEqual<float>(y, test::AsTensor<float>({2, 4, 6, 8}));
445   TF_CHECK_OK(
446       Run("XTimesFour", opts, {{"T", DT_FLOAT}}, instantiate_opts, {x}, {&y}));
447   test::ExpectTensorEqual<float>(y, test::AsTensor<float>({4, 8, 12, 16}));
448 }
449 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultipleCallsSameDeviceFindDevice)450 TEST_F(ProcessFunctionLibraryRuntimeTest, MultipleCallsSameDeviceFindDevice) {
451   Init({test::function::FindDevice()});
452   FunctionLibraryRuntime::Options opts;
453   opts.source_device = "/job:a/replica:0/task:0/cpu:0";
454   opts.remote_execution = true;
455   FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
456   instantiate_opts.target = "/job:a/replica:0/task:0/cpu:1";
457   Tensor y;
458   TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts, {}, {&y}));
459   test::ExpectTensorEqual<tstring>(
460       y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:1"},
461                                  TensorShape({})));
462   TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts, {}, {&y}));
463   test::ExpectTensorEqual<tstring>(
464       y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:1"},
465                                  TensorShape({})));
466 }
467 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultipleCallsDiffDeviceFindDevice)468 TEST_F(ProcessFunctionLibraryRuntimeTest, MultipleCallsDiffDeviceFindDevice) {
469   Init({test::function::FindDevice()});
470   FunctionLibraryRuntime::Options opts;
471   opts.source_device = "/job:a/replica:0/task:0/cpu:0";
472   opts.remote_execution = true;
473   Tensor y;
474   FunctionLibraryRuntime::InstantiateOptions instantiate_opts_0;
475   instantiate_opts_0.target = "/job:a/replica:0/task:0/device:CPU:0";
476   TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts_0, {}, {&y}));
477   test::ExpectTensorEqual<tstring>(
478       y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:0"},
479                                  TensorShape({})));
480   FunctionLibraryRuntime::InstantiateOptions instantiate_opts_1;
481   instantiate_opts_1.target = "/job:a/replica:0/task:0/device:CPU:1";
482   TF_CHECK_OK(Run("FindDevice", opts, {}, instantiate_opts_1, {}, {&y}));
483   test::ExpectTensorEqual<tstring>(
484       y, test::AsTensor<tstring>({"/job:a/replica:0/task:0/device:CPU:1"},
485                                  TensorShape({})));
486 }
487 
TEST_F(ProcessFunctionLibraryRuntimeTest,InstantiateFunctionOnRemovedDevice)488 TEST_F(ProcessFunctionLibraryRuntimeTest, InstantiateFunctionOnRemovedDevice) {
489   std::vector<std::unique_ptr<Device>> devices;
490   Device* device2_ptr = device2_.get();
491   devices.emplace_back(std::move(device2_));
492   TF_CHECK_OK(device_mgr_->AddDevices(std::move(devices)));
493 
494   Init({test::function::FindDevice()});
495   std::vector<Device*> remove_devices{device2_ptr};
496   TF_CHECK_OK(device_mgr_->RemoveDevices(std::move(remove_devices)));
497 
498   // Since the process FLR device set is not updated yet, it still holds the
499   // raw pointer to device2. Make sure that function instantion with device2
500   // will not lead to segfault.
501   FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
502   FunctionLibraryRuntime::Handle h;
503   instantiate_opts.target = "/job:a/replica:0/task:0/device:CPU:1";
504   instantiate_opts.is_multi_device_function = true;
505   TF_CHECK_OK(Instantiate("FindDevice",
506                           {{"_target", "/job:b/replica:0/task:0/device:CPU:2"}},
507                           instantiate_opts, &h));
508 }
509 
TEST_F(ProcessFunctionLibraryRuntimeTest,ClusterFLRSerialTest)510 TEST_F(ProcessFunctionLibraryRuntimeTest, ClusterFLRSerialTest) {
511   Init({test::function::FindDevice()});
512   FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
513   instantiate_opts.target = "/job:b/replica:0/task:0/device:CPU:0";
514   FunctionLibraryRuntime::Handle h;
515   TF_CHECK_OK(Instantiate("FindDevice",
516                           {{"_target", "/job:b/replica:0/task:0/device:CPU:0"}},
517                           instantiate_opts, &h));
518   bool is_cross_process = false;
519   TF_CHECK_OK(proc_flr_->IsCrossProcess(h, &is_cross_process));
520   EXPECT_TRUE(is_cross_process);
521   EXPECT_EQ(0, proc_flr_->GetHandleOnDevice(
522                    "/job:b/replica:0/task:0/device:CPU:0", h));
523   TF_CHECK_OK(Instantiate("FindDevice",
524                           {{"_target", "/job:b/replica:0/task:0/device:CPU:0"}},
525                           instantiate_opts, &h));
526   EXPECT_EQ(0, proc_flr_->GetHandleOnDevice(
527                    "/job:b/replica:0/task:0/device:CPU:0", h));
528   instantiate_opts.target = "/job:c/replica:0/task:0/device:CPU:0";
529   TF_CHECK_OK(Instantiate("FindDevice",
530                           {{"_target", "/job:c/replica:0/task:0/device:CPU:0"}},
531                           instantiate_opts, &h));
532   EXPECT_EQ(1, proc_flr_->GetHandleOnDevice(
533                    "/job:c/replica:0/task:0/device:CPU:0", h));
534 }
535 
TEST_F(ProcessFunctionLibraryRuntimeTest,ClusterFLRParallelTest)536 TEST_F(ProcessFunctionLibraryRuntimeTest, ClusterFLRParallelTest) {
537   Init({test::function::FindDevice()});
538   FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
539   instantiate_opts.target = "/job:b/replica:0/task:0/device:CPU:0";
540 
541   thread::ThreadPool* tp = new thread::ThreadPool(Env::Default(), "test", 4);
542   auto fn = [this, &instantiate_opts]() {
543     FunctionLibraryRuntime::Handle h;
544     TF_CHECK_OK(Instantiate(
545         "FindDevice", {{"_target", "/job:b/replica:0/task:0/device:CPU:0"}},
546         instantiate_opts, &h));
547     EXPECT_EQ(0, proc_flr_->GetHandleOnDevice(
548                      "/job:b/replica:0/task:0/device:CPU:0", h));
549   };
550 
551   for (int i = 0; i < 100; ++i) {
552     tp->Schedule(fn);
553   }
554   delete tp;
555 }
556 
IsCUDATensor(const Tensor & t)557 bool IsCUDATensor(const Tensor& t) {
558 #if GOOGLE_CUDA
559   cudaPointerAttributes attributes;
560   cudaError_t err =
561       cudaPointerGetAttributes(&attributes, t.tensor_data().data());
562   if (err == cudaErrorInvalidValue) return false;
563   CHECK_EQ(cudaSuccess, err) << cudaGetErrorString(err);
564   return (attributes.type == cudaMemoryTypeDevice);
565 #elif TENSORFLOW_USE_ROCM
566   hipPointerAttribute_t attributes;
567   hipError_t err = hipPointerGetAttributes(&attributes, t.tensor_data().data());
568   if (err == hipErrorInvalidValue) return false;
569   CHECK_EQ(hipSuccess, err) << hipGetErrorString(err);
570   return (attributes.memoryType == hipMemoryTypeDevice);
571 #else
572   CHECK(false)
573       << "IsCUDATensor should not be called when CUDA is not available";
574 #endif  // GOOGLE_CUDA
575 }
576 
TestTwoDeviceMult(ProcessFunctionLibraryRuntimeTest * fixture,const FunctionLibraryRuntime::InstantiateOptions & inst_opts,const string & error="")577 void TestTwoDeviceMult(
578     ProcessFunctionLibraryRuntimeTest* fixture,
579     const FunctionLibraryRuntime::InstantiateOptions& inst_opts,
580     const string& error = "") {
581   fixture->Init({test::function::TwoDeviceMult()});
582   FunctionLibraryRuntime::Options opts;
583   auto x = test::AsTensor<float>({1, 2, 3});
584   Tensor y_cpu;
585   Tensor y_gpu;
586   Status status = fixture->Run("TwoDeviceMult", opts, {{"T", DT_FLOAT}},
587                                inst_opts, {x}, {&y_cpu, &y_gpu});
588   if (!error.empty()) {
589     EXPECT_TRUE(errors::IsInvalidArgument(status))
590         << "Actual status: " << status;
591     EXPECT_TRUE(absl::StrContains(status.error_message(), error))
592         << "Actual error message: " << status.error_message();
593     return;
594   }
595 
596   EXPECT_TRUE(status.ok()) << "Actual status: " << status;
597   EXPECT_FALSE(IsCUDATensor(y_cpu));
598   test::ExpectTensorEqual<float>(y_cpu, test::AsTensor<float>({2, 4, 6}));
599 
600   EXPECT_TRUE(IsCUDATensor(y_gpu));
601   Tensor y_gpu_on_cpu = fixture->GPUToCPU(y_gpu);
602   test::ExpectTensorEqual<float>(y_gpu_on_cpu,
603                                  test::AsTensor<float>({3, 6, 9}));
604 }
605 
TestInstantiateSimpleFunction(ProcessFunctionLibraryRuntimeTest * fixture,const FunctionLibraryRuntime::InstantiateOptions & orig_opts)606 void TestInstantiateSimpleFunction(
607     ProcessFunctionLibraryRuntimeTest* fixture,
608     const FunctionLibraryRuntime::InstantiateOptions& orig_opts) {
609   fixture->Init({test::function::FindDevice()});
610   FunctionLibraryRuntime::InstantiateOptions opts_copy = orig_opts;
611   opts_copy.input_devices.clear();
612   FunctionLibraryRuntime::Handle h;
613   TF_CHECK_OK(fixture->Instantiate(
614       "FindDevice", {{"_target", "/job:b/replica:0/task:0/device:CPU:0"}},
615       opts_copy, &h));
616 }
617 
TestControlFlow(ProcessFunctionLibraryRuntimeTest * fixture,const FunctionLibraryRuntime::InstantiateOptions & inst_opts)618 void TestControlFlow(
619     ProcessFunctionLibraryRuntimeTest* fixture,
620     const FunctionLibraryRuntime::InstantiateOptions& inst_opts) {
621   fixture->Init({test::function::ControlFlow()});
622 
623   FunctionLibraryRuntime::Options opts;
624   Tensor x1 = test::AsTensor<float>({3, 5, 17, 257});
625   if (absl::StrContains(inst_opts.input_devices[0], "GPU")) {
626     x1 = fixture->CPUToGPU(x1);
627   }
628   Tensor y1;
629   TF_CHECK_OK(fixture->Run("ControlFlow", opts, {}, inst_opts, {x1}, {&y1}));
630 
631   if (absl::StrContains(inst_opts.output_devices[0], "GPU")) {
632     EXPECT_TRUE(IsCUDATensor(y1));
633     y1 = fixture->GPUToCPU(y1);
634   }
635   test::ExpectTensorEqual<float>(y1, test::AsTensor<float>({3, 5, 17, 257}));
636 }
637 
TestTwoDeviceInputOutput(ProcessFunctionLibraryRuntimeTest * fixture,const FunctionLibraryRuntime::InstantiateOptions & inst_opts)638 void TestTwoDeviceInputOutput(
639     ProcessFunctionLibraryRuntimeTest* fixture,
640     const FunctionLibraryRuntime::InstantiateOptions& inst_opts) {
641   if (fixture->gpu_device_ == nullptr) {
642     GTEST_SKIP() << "No GPUs available";
643   }
644   fixture->Init({test::function::TwoDeviceInputOutput()});
645 
646   FunctionLibraryRuntime::Options opts;
647   Tensor x1 = test::AsTensor<float>({1, 2});
648   if (absl::StrContains(inst_opts.input_devices[0], "GPU")) {
649     x1 = fixture->CPUToGPU(x1);
650   }
651   Tensor x2 = test::AsTensor<float>({10, 20});
652   if (absl::StrContains(inst_opts.input_devices[1], "GPU")) {
653     x2 = fixture->CPUToGPU(x2);
654   }
655   Tensor y1;
656   Tensor y2;
657   TF_CHECK_OK(fixture->Run("TwoDeviceInputOutput", opts, {{"T", DT_FLOAT}},
658                            inst_opts, {x1, x2}, {&y1, &y2}));
659 
660   if (absl::StrContains(inst_opts.output_devices[0], "GPU")) {
661     EXPECT_TRUE(IsCUDATensor(y1));
662     y1 = fixture->GPUToCPU(y1);
663   } else {
664     EXPECT_FALSE(IsCUDATensor(y1));
665   }
666   test::ExpectTensorEqual<float>(y1, test::AsTensor<float>({2, 4}));
667 
668   if (absl::StrContains(inst_opts.output_devices[1], "GPU")) {
669     EXPECT_TRUE(IsCUDATensor(y2));
670     y2 = fixture->GPUToCPU(y2);
671   } else {
672     EXPECT_FALSE(IsCUDATensor(y2));
673   }
674   test::ExpectTensorEqual<float>(y2, test::AsTensor<float>({30, 60}));
675 }
676 
CompleteDevices(const std::vector<string> & v)677 std::vector<string> CompleteDevices(const std::vector<string>& v) {
678   std::vector<string> result;
679   result.reserve(v.size());
680   for (const string& s : v) {
681     result.push_back(strings::StrCat("/job:a/replica:0/task:0/device:", s));
682   }
683   return result;
684 }
685 
MakeOptions(const string & target,const std::vector<string> & input_devices,const std::vector<string> & output_devices)686 FunctionLibraryRuntime::InstantiateOptions MakeOptions(
687     const string& target, const std::vector<string>& input_devices,
688     const std::vector<string>& output_devices) {
689   FunctionLibraryRuntime::InstantiateOptions inst_opts;
690   inst_opts.target = target;
691   inst_opts.input_devices = CompleteDevices(input_devices);
692   inst_opts.output_devices = CompleteDevices(output_devices);
693   inst_opts.is_multi_device_function = true;
694   return inst_opts;
695 }
696 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ExplicitOutputDevice)697 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ExplicitOutputDevice) {
698   if (gpu_device_ == nullptr) {
699     GTEST_SKIP() << "No GPUs available";
700   }
701   TestTwoDeviceMult(this, MakeOptions("CPU:0", {"CPU:0"}, {"CPU:0", "GPU:0"}));
702 }
703 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_InferredOutputDevice)704 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_InferredOutputDevice) {
705   if (gpu_device_ == nullptr) {
706     GTEST_SKIP() << "No GPUs available";
707   }
708   TestTwoDeviceMult(this, MakeOptions("CPU:0", {"CPU:0"}, {}));
709 }
710 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ErrorWhenNoInputDevices)711 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ErrorWhenNoInputDevices) {
712   if (gpu_device_ == nullptr) {
713     GTEST_SKIP() << "No GPUs available";
714   }
715   TestTwoDeviceMult(this, MakeOptions("CPU:0", {}, {}),
716                     "input_devices must have the same length");
717 }
718 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ErrorWhenTooManyInputDevices)719 TEST_F(ProcessFunctionLibraryRuntimeTest,
720        MultiDevice_ErrorWhenTooManyInputDevices) {
721   if (gpu_device_ == nullptr) {
722     GTEST_SKIP() << "No GPUs available";
723   }
724   TestTwoDeviceMult(this, MakeOptions("CPU:0", {"CPU:0", "CPU:1"}, {}),
725                     "input_devices must have the same length");
726 }
727 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ErrorWhenTooManyOutputDevices)728 TEST_F(ProcessFunctionLibraryRuntimeTest,
729        MultiDevice_ErrorWhenTooManyOutputDevices) {
730   TestTwoDeviceMult(
731       this, MakeOptions("CPU:0", {"CPU:0"}, {"CPU:0", "GPU:0", "CPU:1"}),
732       "output_devices must either be empty or have the same length");
733 }
734 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ErrorWhenBadTargetDevice)735 TEST_F(ProcessFunctionLibraryRuntimeTest,
736        MultiDevice_ErrorWhenBadTargetDevice) {
737   TestTwoDeviceMult(
738       this, MakeOptions("GPU:11", {"CPU:0"}, {"CPU:0", "GPU:0"}),
739       "Cannot instantiate multi-device function with target device GPU:11");
740 }
741 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ErrorWhenListInput)742 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ErrorWhenListInput) {
743   const FunctionDef& def = test::function::FuncWithListInput();
744   Init({def});
745   FunctionLibraryRuntime::Handle handle;
746   Status status = proc_flr_->Instantiate(
747       "FuncWithListInput", test::function::Attrs({{"T", DT_FLOAT}, {"N", 1}}),
748       MakeOptions("CPU:0", {"CPU:0"}, {}), &handle);
749   ASSERT_TRUE(errors::IsInvalidArgument(status)) << "Actual status: " << status;
750   ASSERT_TRUE(absl::StrContains(
751       status.error_message(),
752       "FuncWithListInput has an input named \"x1\" that is a list of tensors"))
753       << "Actual error message: " << status.error_message();
754 }
755 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ErrorWhenListOutput)756 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ErrorWhenListOutput) {
757   const FunctionDef& def = test::function::FuncWithListOutput();
758   Init({def});
759   FunctionLibraryRuntime::Handle handle;
760   Status status = proc_flr_->Instantiate(
761       "FuncWithListOutput", test::function::Attrs({{"T", DT_FLOAT}, {"N", 1}}),
762       MakeOptions("CPU:0", {}, {"CPU:0"}), &handle);
763   ASSERT_TRUE(errors::IsInvalidArgument(status)) << "Actual status: " << status;
764   ASSERT_TRUE(absl::StrContains(
765       status.error_message(),
766       "FuncWithListOutput has an output named \"y\" that is a list of tensors"))
767       << "Actual error message: " << status.error_message();
768 }
769 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ExplicitMultiInputOutput)770 TEST_F(ProcessFunctionLibraryRuntimeTest,
771        MultiDevice_ExplicitMultiInputOutput) {
772   TestTwoDeviceInputOutput(
773       this, MakeOptions("CPU:0", {"CPU:0", "GPU:0"}, {"CPU:0", "GPU:0"}));
774 }
775 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_FlipInputs)776 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_FlipInputs) {
777   TestTwoDeviceInputOutput(
778       this, MakeOptions("CPU:0", {"GPU:0", "CPU:0"}, {"CPU:0", "GPU:0"}));
779 }
780 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_FlipOutputs)781 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_FlipOutputs) {
782   TestTwoDeviceInputOutput(
783       this, MakeOptions("CPU:0", {"CPU:0", "GPU:0"}, {"GPU:0", "CPU:0"}));
784 }
785 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_FlipBoth)786 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_FlipBoth) {
787   TestTwoDeviceInputOutput(
788       this, MakeOptions("CPU:0", {"GPU:0", "CPU:0"}, {"GPU:0", "CPU:0"}));
789 }
790 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_EmptyBodySwap)791 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_EmptyBodySwap) {
792   if (gpu_device_ == nullptr) {
793     GTEST_SKIP() << "No GPUs available";
794   }
795   FunctionLibraryRuntime::InstantiateOptions inst_opts =
796       MakeOptions("CPU:0", {"GPU:0", "CPU:0"}, {"CPU:0", "GPU:0"});
797   Init({test::function::EmptyBodySwap()});
798 
799   Tensor x1 = CPUToGPU(test::AsTensor<float>({1, 2}));
800   Tensor x2 = test::AsTensor<float>({10, 20});
801   Tensor y1;
802   Tensor y2;
803   TF_CHECK_OK(Run("EmptyBodySwap", {}, {{"T", DT_FLOAT}}, inst_opts, {x1, x2},
804                   {&y1, &y2}));
805 
806   EXPECT_FALSE(IsCUDATensor(y1));
807   test::ExpectTensorEqual<float>(y1, test::AsTensor<float>({10, 20}));
808 
809   EXPECT_TRUE(IsCUDATensor(y2));
810   y2 = GPUToCPU(y2);
811   test::ExpectTensorEqual<float>(y2, test::AsTensor<float>({1, 2}));
812 }
813 
GetResourceHandle(const string & var_name,const string & container,const string & device_name)814 Tensor GetResourceHandle(const string& var_name, const string& container,
815                          const string& device_name) {
816   ResourceHandle handle;
817   handle.set_device(device_name);
818   handle.set_container(container);
819   handle.set_name(var_name);
820   handle.set_hash_code(TypeIndex::Make<Var>().hash_code());
821   handle.set_maybe_type_name(TypeIndex::Make<Var>().name());
822   Tensor tensor(DT_RESOURCE, TensorShape({}));
823   tensor.scalar<ResourceHandle>()() = handle;
824   return tensor;
825 }
826 
827 // Returns a function which adds two variables on different devices.
AddVarAcrossDevices()828 FunctionDef AddVarAcrossDevices() {
829   return FunctionDefHelper::Create(
830       // Name
831       "AddVarAcrossDevices",
832       // Args
833       {"x: resource"},
834       // Return values
835       {"y: float"},
836       // Attr def
837       {},
838       // Nodes
839       {
840           {{"read0"},
841            "ReadVariableOp",
842            {"x"},
843            {{"dtype", DT_FLOAT}},
844            {},
845            "/device:CPU:0"},
846           {{"read1"},
847            "ReadVariableOp",
848            {"x"},
849            {{"dtype", DT_FLOAT}},
850            {},
851            "/device:CPU:1"},
852           {{"add"},
853            "Add",
854            {"read0:value:0", "read1:value:0"},
855            {{"T", DT_FLOAT}},
856            {},
857            "/device:CPU:0"},
858       },
859       {{"y", "add:z:0"}});
860 }
861 
862 // An implementation of FunctionArgsInterface for packed inputs.
863 class TestFunctionPackedArgs : public FunctionArgsInterface {
864  public:
TestFunctionPackedArgs(const int index,gtl::InlinedVector<TensorValue,4> && tensor_args)865   TestFunctionPackedArgs(const int index,
866                          gtl::InlinedVector<TensorValue, 4>&& tensor_args) {
867     packed_args_.emplace(index, std::move(tensor_args));
868   }
869 
~TestFunctionPackedArgs()870   ~TestFunctionPackedArgs() override{};
871 
HasRemoteOrPackedInputs() const872   bool HasRemoteOrPackedInputs() const override { return true; };
873 
GetLocalArg(const FunctionArgIndex & index,Tensor * val) const874   Status GetLocalArg(const FunctionArgIndex& index,
875                      Tensor* val) const override {
876     *val = *packed_args_.at(index.index).at(index.sub_index).tensor;
877     return OkStatus();
878   };
879 
GetLocalTensors() const880   std::vector<Tensor> GetLocalTensors() const override { return {}; }
881 
882  private:
883   absl::flat_hash_map<int, gtl::InlinedVector<TensorValue, 4>> packed_args_;
884 };
885 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_CompositeDevice)886 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_CompositeDevice) {
887   Init({AddVarAcrossDevices()});
888   // Create two variables on two devices.
889   const Tensor initial_resource_value0 = test::AsTensor<float>({10, 20});
890   Var* resource0 = new Var(DT_FLOAT);
891   *resource0->tensor() = initial_resource_value0;
892   resource0->is_initialized = true;
893   const Tensor initial_resource_value1 = test::AsTensor<float>({30, 40});
894   Var* resource1 = new Var(DT_FLOAT);
895   *resource1->tensor() = initial_resource_value1;
896   resource1->is_initialized = true;
897   ResourceMgr* mgr0 = device0_->resource_manager();
898   ResourceMgr* mgr1 = device1_->resource_manager();
899   TF_ASSERT_OK(mgr0->Create(mgr0->default_container(), "var", resource0));
900   TF_ASSERT_OK(mgr1->Create(mgr1->default_container(), "var", resource1));
901 
902   Tensor resource_handle0 =
903       GetResourceHandle("var", mgr0->default_container(), device0_->name());
904   Tensor resource_handle1 =
905       GetResourceHandle("var", mgr1->default_container(), device1_->name());
906 
907   // Create a CompositeDevice
908   Status s;
909   std::unique_ptr<CompositeDevice> composite_device =
910       CompositeDevice::MakeDevice({device0_->name(), device1_->name()},
911                                   /*unique_device_id=*/0,
912                                   device_mgr_->HostCPU()->parsed_name(), &s);
913   TF_ASSERT_OK(s);
914   AddCompositeDevice(composite_device.get());
915 
916   FunctionLibraryRuntime::Options opts;
917   FunctionLibraryRuntime::InstantiateOptions inst_opts =
918       MakeOptions("CPU:0", {"COMPOSITE:0"}, {"CPU:0"});
919   inst_opts.composite_devices[composite_device->name()] =
920       composite_device->underlying_devices();
921   inst_opts.input_resource_dtypes_and_shapes[0] = {
922       initial_resource_value0.dtype(), initial_resource_value0.shape()};
923 
924   // Packed TensorHandle
925   {
926     gtl::InlinedVector<TensorValue, 4> handles;
927     handles.push_back(TensorValue(&resource_handle0));
928     handles.push_back(TensorValue(&resource_handle1));
929     TestFunctionPackedArgs args(0, std::move(handles));
930     FunctionRet ret;
931     TF_CHECK_OK(RunWithPackedArgs("AddVarAcrossDevices", opts,
932                                   {{"T", DT_FLOAT}}, inst_opts, args, {&ret}));
933     EXPECT_EQ(ret.index(), 0);
934     test::ExpectTensorEqual<float>(absl::get<Tensor>(ret),
935                                    test::AsTensor<float>({40, 60}));
936   }
937 
938   // Packed Tensor
939   {
940     Tensor arg(DT_RESOURCE, TensorShape({2}));
941     arg.flat<ResourceHandle>()(0) = resource_handle0.scalar<ResourceHandle>()();
942     arg.flat<ResourceHandle>()(1) = resource_handle1.scalar<ResourceHandle>()();
943 
944     Tensor ret;
945     TF_CHECK_OK(Run("AddVarAcrossDevices", opts, {{"T", DT_FLOAT}}, inst_opts,
946                     {arg}, {&ret}));
947     test::ExpectTensorEqual<float>(ret, test::AsTensor<float>({40, 60}));
948   }
949 }
950 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_ResourceOutput_GPU)951 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_ResourceOutput_GPU) {
952   if (gpu_device_ == nullptr) {
953     GTEST_SKIP() << "No GPUs available";
954   }
955   FunctionLibraryRuntime::InstantiateOptions inst_opts =
956       MakeOptions("CPU:0", {"GPU:0", "GPU:0"}, {"GPU:0", "GPU:0"});
957   Init({test::function::ResourceOutput(),
958         test::function::ReadResourceVariable()});
959 
960   // Make resource var
961   Tensor resource_value = CPUToGPU(test::AsTensor<float>({10, 20}));
962   Var* resource = new Var(DT_FLOAT);
963   *resource->tensor() = resource_value;
964   resource->is_initialized = true;
965   ResourceMgr* mgr = gpu_device_->resource_manager();
966   Status status = mgr->Create(mgr->default_container(), "my_gpu_var", resource);
967   ASSERT_TRUE(status.ok()) << status.error_message();
968 
969   // Run the function taking a resource and outputting it
970   FunctionLibraryRuntime::Options opts;
971   Tensor x1 = CPUToGPU(test::AsTensor<float>({1, 2}));
972   Tensor x2 = GetResourceHandle("my_gpu_var", mgr->default_container(),
973                                 "/job:a/replica:0/task:0/device:GPU:0");
974   Tensor returned_handle;
975   Tensor y2;
976   TF_CHECK_OK(Run("ResourceOutput", opts, {{"T", DT_FLOAT}}, inst_opts,
977                   {x1, x2}, {&returned_handle, &y2}));
978 
979   EXPECT_FALSE(IsCUDATensor(returned_handle));
980   EXPECT_TRUE(IsCUDATensor(y2));
981   y2 = GPUToCPU(y2);
982   test::ExpectTensorEqual<float>(y2, test::AsTensor<float>({2, 4}));
983 
984   // Read the variable using the handle returned from previous function to
985   // make sure the handle and read value is on the right device.
986   inst_opts = MakeOptions("GPU:0", {"GPU:0"}, {"GPU:0"});
987   Tensor read_resource;
988   TF_CHECK_OK(Run("ReadResourceVariable", opts, {{"T", DT_FLOAT}}, inst_opts,
989                   {returned_handle}, {&read_resource}));
990   EXPECT_TRUE(IsCUDATensor(read_resource));
991   read_resource = GPUToCPU(read_resource);
992   test::ExpectTensorEqual<float>(read_resource,
993                                  test::AsTensor<float>({10, 20}));
994 }
995 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_PlacerError)996 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_PlacerError) {
997   if (gpu_device_ == nullptr) {
998     GTEST_SKIP() << "No GPUs available";
999   }
1000   // ResourceOutput forwards second input to first output. Both are resources.
1001   // Placer should not be able to place this graph because we ask it to place
1002   // second input on GPU but first output to CPU.
1003   FunctionLibraryRuntime::InstantiateOptions inst_opts =
1004       MakeOptions("CPU:0", {"GPU:0", "GPU:0"}, {"CPU:0", "GPU:0"});
1005   Init({test::function::ResourceOutput(),
1006         test::function::ReadResourceVariable()});
1007 
1008   FunctionLibraryRuntime::Handle handle;
1009   Status status = proc_flr_->Instantiate(
1010       "ResourceOutput", test::function::Attrs({{"T", DT_FLOAT}}), inst_opts,
1011       &handle);
1012   ASSERT_TRUE(errors::IsInvalidArgument(status)) << "Actual status: " << status;
1013   ASSERT_TRUE(absl::StrContains(status.error_message(), "Cannot place"));
1014 }
1015 
1016 REGISTER_OP("BrokenOp")
1017     .Input("in: T")
1018     .Output("out: T")
1019     .Attr("T: type")
1020     .SetShapeFn(shape_inference::UnknownShape);
1021 class BrokenOp : public OpKernel {
1022  public:
BrokenOp(OpKernelConstruction * ctx)1023   explicit BrokenOp(OpKernelConstruction* ctx) : OpKernel(ctx) {
1024     ctx->SetStatus(errors::Internal("I am broken"));
1025   }
1026 
Compute(OpKernelContext * ctx)1027   void Compute(OpKernelContext* ctx) override {
1028     ctx->SetStatus(errors::Internal("I am broken"));
1029   }
1030 };
1031 REGISTER_KERNEL_BUILDER(Name("BrokenOp").Device(DEVICE_CPU), BrokenOp);
1032 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_CreateKernelsEagerly)1033 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_CreateKernelsEagerly) {
1034   auto T = DT_INT32;
1035   // The expected sequence of outputs from this function is [6, 4, 0, 1, ...].
1036   FunctionDef broken_func = FunctionDefHelper::Define(
1037       // Name
1038       "Broken",
1039       // Args
1040       {"x: int32"},
1041       // Return values
1042       {"y: int32"},
1043       // Attrs
1044       {},
1045       // Nodes
1046       {{{"y"}, "BrokenOp", {"x"}, {{"T", T}}}});
1047   Init({broken_func});
1048 
1049   FunctionLibraryRuntime::InstantiateOptions inst_opts =
1050       MakeOptions("CPU:0", {"CPU:0"}, {"CPU:0"});
1051 
1052   // Instantiating the broken function should work.
1053   FunctionLibraryRuntime::Handle handle;
1054   TF_CHECK_OK(Instantiate("Broken", {{"T", DT_INT32}}, inst_opts, &handle));
1055   TF_CHECK_OK(proc_flr_->ReleaseHandle(handle));
1056 
1057   // Instantiating the broken function while creating kernels eagerly should
1058   // fail.
1059   inst_opts.create_kernels_eagerly = true;
1060   Status status = Instantiate("Broken", {{"T", DT_INT32}}, inst_opts, &handle);
1061   EXPECT_TRUE(errors::IsInternal(status));
1062 }
1063 
TEST_F(ProcessFunctionLibraryRuntimeTest,MultiDevice_StateHandle)1064 TEST_F(ProcessFunctionLibraryRuntimeTest, MultiDevice_StateHandle) {
1065   auto T = DT_INT32;
1066   // The expected sequence of outputs from this function is [6, 4, 0, 1, ...].
1067   FunctionDef stateful_func = FunctionDefHelper::Define(
1068       // Name
1069       "RandomUniformWrapper",
1070       // Args
1071       {"x: resource"},
1072       // Return values
1073       {"y: int32"},
1074       // Attrs
1075       {},
1076       // Nodes
1077       {FunctionDefHelper::Const<int32>("shape", gtl::ArraySlice<int32>({1})),
1078        FunctionDefHelper::Const<int32>("minval", 0),
1079        {{"maxval"}, "ReadVariableOp", {"x"}, {{"dtype", T}}, {}},
1080        // A stateful node.
1081        {{"y"},
1082         "RandomUniformInt",
1083         {"shape", "minval", "maxval"},
1084         {{"seed", 37}, {"seed2", 48}, {"Tout", T}, {"T", T}}}});
1085   Init({stateful_func});
1086   if (gpu_device_ == nullptr) {
1087     GTEST_SKIP() << "No GPUs available";
1088   }
1089 
1090   // Make resource variables.
1091   ResourceMgr* mgr = gpu_device_->resource_manager();
1092   Tensor resource_value = CPUToGPU(test::AsScalar<int>(10));
1093   Var* resource = new Var(T);
1094   *resource->tensor() = resource_value;
1095   resource->is_initialized = true;
1096   Status status = mgr->Create(mgr->default_container(), "my_gpu_var", resource);
1097   ASSERT_TRUE(status.ok()) << status.error_message();
1098 
1099   Tensor x = GetResourceHandle("my_gpu_var", mgr->default_container(),
1100                                "/job:a/replica:0/task:0/device:GPU:0");
1101   Tensor y;
1102 
1103   FunctionLibraryRuntime::InstantiateOptions inst_opts =
1104       MakeOptions("CPU:0", {"GPU:0"}, {"CPU:0"});
1105 
1106   // Instantiate the function with no state handle.
1107   FunctionLibraryRuntime::Handle handle;
1108   TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}}, inst_opts,
1109                           &handle));
1110   for (auto expected : {6, 4}) {
1111     TF_CHECK_OK(RunInstantiated(handle, {}, {x}, {&y}));
1112     test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
1113   }
1114 
1115   // Instantiating the function again with no state handle should result in the
1116   // same handle.
1117   FunctionLibraryRuntime::Handle other_handle;
1118   TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}}, inst_opts,
1119                           &other_handle));
1120   EXPECT_EQ(handle, other_handle);
1121   // Running the function should yield continuation of the same sequence.
1122   for (auto expected : {0, 1}) {
1123     TF_CHECK_OK(RunInstantiated(other_handle, {}, {x}, {&y}));
1124     test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
1125   }
1126 
1127   // Instantiating the function with a state handle should result in a different
1128   // handle.
1129   inst_opts.state_handle = "handle_1";
1130   TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}}, inst_opts,
1131                           &other_handle));
1132   EXPECT_NE(handle, other_handle);
1133   // Running the function should yield the original sequeunce.
1134   for (auto expected : {6, 4, 0, 1}) {
1135     TF_CHECK_OK(RunInstantiated(other_handle, {}, {x}, {&y}));
1136     test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
1137   }
1138 
1139   // Instantiating the function with a different state handle should result in a
1140   // different handle.
1141   inst_opts.state_handle = "handle_2";
1142   TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}}, inst_opts,
1143                           &other_handle));
1144   EXPECT_NE(handle, other_handle);
1145   // Running the function should yield the original sequeunce.
1146   for (auto expected : {6, 4, 0, 1}) {
1147     TF_CHECK_OK(RunInstantiated(other_handle, {}, {x}, {&y}));
1148     test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
1149   }
1150 
1151   // Repeatedly instantiating a function and releasing its handle will result in
1152   // repeating the original sequence.
1153   inst_opts.state_handle = "handle_3";
1154   for (int i = 0; i < 2; ++i) {
1155     TF_CHECK_OK(Instantiate("RandomUniformWrapper", {{"T", DT_INT32}},
1156                             inst_opts, &other_handle));
1157     EXPECT_NE(handle, other_handle);
1158     // Running the function should yield the original sequeunce.
1159     for (auto expected : {6, 4, 0, 1}) {
1160       TF_CHECK_OK(RunInstantiated(other_handle, {}, {x}, {&y}));
1161       test::ExpectTensorEqual<int>(y, test::AsTensor<int>({expected}));
1162     }
1163     TF_CHECK_OK(proc_flr_->ReleaseHandle(other_handle));
1164   }
1165 }
1166 
1167 REGISTER_OP("SessionMetadataReader")
1168     .Input("x: int64")
1169     .Output("y: string")
1170     .SetIsStateful()
1171     .Doc(R"doc(SessionMetadataReader returns the session metadata.
1172 
1173 x: int64
1174 y: string
1175 )doc");
1176 
1177 class SessionMetadataReaderOp : public OpKernel {
1178  public:
SessionMetadataReaderOp(OpKernelConstruction * ctx)1179   explicit SessionMetadataReaderOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}
Compute(OpKernelContext * ctx)1180   void Compute(OpKernelContext* ctx) override {
1181     Tensor* out_tensor = nullptr;
1182     OP_REQUIRES_OK(ctx,
1183                    ctx->allocate_output("y", TensorShape({}), &out_tensor));
1184     if (ctx->session_metadata() != nullptr) {
1185       out_tensor->scalar<tstring>()() = ctx->session_metadata()->DebugString();
1186     } else {
1187       out_tensor->scalar<tstring>()() = "";
1188     }
1189   }
1190 };
1191 REGISTER_KERNEL_BUILDER(Name("SessionMetadataReader").Device(DEVICE_CPU),
1192                         SessionMetadataReaderOp);
1193 
SessionMetadataReaderOpFn()1194 FunctionDef SessionMetadataReaderOpFn() {
1195   return FunctionDefHelper::Define(
1196       // Name
1197       "SessionMetadataReaderFn",
1198       // Args
1199       {"x: int64"},
1200       // Return values
1201       {"y: string"},
1202       // Attr def
1203       {},
1204       // Nodes
1205       {{{"y"}, "SessionMetadataReader", {"x"}, {}}});
1206 }
1207 
TEST_F(ProcessFunctionLibraryRuntimeTest,SessionMetadataAbsent)1208 TEST_F(ProcessFunctionLibraryRuntimeTest, SessionMetadataAbsent) {
1209   Init({SessionMetadataReaderOpFn()}, /*session_metadata=*/nullptr);
1210   FunctionLibraryRuntime::Options opts;
1211   opts.source_device = "/job:a/replica:0/task:0/cpu:0";
1212   opts.remote_execution = true;
1213   FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
1214   instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
1215   const auto x = test::AsTensor<int64_t>({17});
1216   Tensor y;
1217   TF_CHECK_OK(
1218       Run("SessionMetadataReaderFn", opts, {}, instantiate_opts, {x}, {&y}));
1219   EXPECT_EQ("", y.scalar<tstring>()());
1220 }
1221 
TEST_F(ProcessFunctionLibraryRuntimeTest,SessionMetadataPresent)1222 TEST_F(ProcessFunctionLibraryRuntimeTest, SessionMetadataPresent) {
1223   const SessionMetadata session_metadata = GenerateSessionMetadata();
1224   Init({SessionMetadataReaderOpFn()}, &session_metadata);
1225   FunctionLibraryRuntime::Options opts;
1226   opts.source_device = "/job:a/replica:0/task:0/cpu:0";
1227   opts.remote_execution = true;
1228   FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
1229   instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
1230   const auto x = test::AsTensor<int64_t>({17});
1231   Tensor y;
1232   TF_CHECK_OK(
1233       Run("SessionMetadataReaderFn", opts, {}, instantiate_opts, {x}, {&y}));
1234   SessionMetadata read_metadata;
1235   ASSERT_TRUE(protobuf::TextFormat::ParseFromString(y.scalar<tstring>()(),
1236                                                     &read_metadata));
1237   EXPECT_EQ(session_metadata.name(), read_metadata.name());
1238   EXPECT_EQ(session_metadata.version(), read_metadata.version());
1239 }
1240 
TEST_F(ProcessFunctionLibraryRuntimeTest,CompositeDevicesAfterCloning)1241 TEST_F(ProcessFunctionLibraryRuntimeTest, CompositeDevicesAfterCloning) {
1242   Init({AddVarAcrossDevices()});
1243 
1244   Status s;
1245   std::unique_ptr<CompositeDevice> composite_device =
1246       CompositeDevice::MakeDevice({device0_->name(), device1_->name()},
1247                                   /*unique_device_id=*/0,
1248                                   device_mgr_->HostCPU()->parsed_name(), &s);
1249   TF_ASSERT_OK(s);
1250   AddCompositeDevice(composite_device.get());
1251 
1252   auto* flr = proc_flr_->GetFLR("/job:a/replica:0/task:0/cpu:0");
1253   ASSERT_NE(nullptr, flr);
1254   std::unique_ptr<FunctionLibraryDefinition> cloned_lib_def;
1255   std::unique_ptr<ProcessFunctionLibraryRuntime> cloned_proc_flr;
1256   FunctionLibraryRuntime* cloned_flr;
1257   TF_ASSERT_OK(flr->Clone(&cloned_lib_def, &cloned_proc_flr, &cloned_flr));
1258   EXPECT_EQ(
1259       cloned_proc_flr->device_set()->FindDeviceByName(composite_device->name()),
1260       composite_device.get());
1261 }
1262 
TEST_F(ProcessFunctionLibraryRuntimeTest,SessionMetadataPresentAfterCloning)1263 TEST_F(ProcessFunctionLibraryRuntimeTest, SessionMetadataPresentAfterCloning) {
1264   const SessionMetadata session_metadata = GenerateSessionMetadata();
1265   Init({SessionMetadataReaderOpFn()}, &session_metadata);
1266   auto* flr = proc_flr_->GetFLR("/job:a/replica:0/task:0/cpu:0");
1267   ASSERT_NE(nullptr, flr);
1268   std::unique_ptr<FunctionLibraryDefinition> cloned_lib_def;
1269   std::unique_ptr<ProcessFunctionLibraryRuntime> cloned_proc_flr;
1270   FunctionLibraryRuntime* cloned_flr;
1271   TF_ASSERT_OK(flr->Clone(&cloned_lib_def, &cloned_proc_flr, &cloned_flr));
1272   FunctionLibraryRuntime::Options opts;
1273   opts.source_device = "/job:a/replica:0/task:0/cpu:0";
1274   opts.remote_execution = true;
1275   FunctionLibraryRuntime::InstantiateOptions instantiate_opts;
1276   instantiate_opts.target = "/job:a/replica:0/task:0/cpu:0";
1277   const auto x = test::AsTensor<int64_t>({17});
1278   Tensor y;
1279   Status s = RunWithRuntime<std::vector<Tensor>, Tensor>(
1280       "SessionMetadataReaderFn", opts, {}, instantiate_opts, {x}, {&y},
1281       cloned_proc_flr.get());
1282   TF_CHECK_OK(s);
1283   SessionMetadata read_metadata;
1284   ASSERT_TRUE(protobuf::TextFormat::ParseFromString(y.scalar<tstring>()(),
1285                                                     &read_metadata));
1286   EXPECT_EQ(session_metadata.name(), read_metadata.name());
1287   EXPECT_EQ(session_metadata.version(), read_metadata.version());
1288 }
1289 
TEST_F(ProcessFunctionLibraryRuntimeTest,SimpleGraphAllowsSync)1290 TEST_F(ProcessFunctionLibraryRuntimeTest, SimpleGraphAllowsSync) {
1291   auto async_safe =
1292       metrics::TestDelta("subgraph_async_summary", "safe_for_sync");
1293   FunctionLibraryRuntime::InstantiateOptions opts =
1294       MakeOptions("CPU:0", {}, {});
1295   opts.allow_small_function_optimizations = true;
1296   TestInstantiateSimpleFunction(this, opts);
1297   EXPECT_GT(async_safe.Get(), 0);
1298 }
1299 
TEST_F(ProcessFunctionLibraryRuntimeTest,UnsafeOpRequiresAsync)1300 TEST_F(ProcessFunctionLibraryRuntimeTest, UnsafeOpRequiresAsync) {
1301   auto async_safe =
1302       metrics::TestDelta("subgraph_async_summary", "safe_for_sync");
1303   auto async_unsafe_op =
1304       metrics::TestDelta("subgraph_async_summary", "unsafe_op");
1305   FunctionLibraryRuntime::InstantiateOptions opts =
1306       MakeOptions("CPU:0", {"CPU:0"}, {"CPU:0"});
1307   opts.allow_small_function_optimizations = true;
1308   TestControlFlow(this, opts);
1309   EXPECT_EQ(async_safe.Get(), 0);
1310   EXPECT_GT(async_unsafe_op.Get(), 0);
1311 }
1312 
TEST_F(ProcessFunctionLibraryRuntimeTest,PartitionedGraphRequiresAsync)1313 TEST_F(ProcessFunctionLibraryRuntimeTest, PartitionedGraphRequiresAsync) {
1314   if (gpu_device_ == nullptr) {
1315     GTEST_SKIP() << "No GPUs available";
1316   }
1317   auto async_send_only =
1318       metrics::TestDelta("subgraph_async_summary", "send_only");
1319   auto async_recv_only =
1320       metrics::TestDelta("subgraph_async_summary", "recv_only");
1321   FunctionLibraryRuntime::InstantiateOptions opts =
1322       MakeOptions("CPU:0", {"CPU:0"}, {"CPU:0", "GPU:0"});
1323   opts.allow_small_function_optimizations = true;
1324   TestTwoDeviceMult(this, opts);
1325   EXPECT_GT(async_send_only.Get(), 0);
1326   EXPECT_GT(async_recv_only.Get(), 0);
1327 }
1328 
1329 }  // anonymous namespace
1330 }  // namespace tensorflow
1331