1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_EAGER_DESTROY_TENSOR_HANDLE_NODE_H_
17 #define TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_EAGER_DESTROY_TENSOR_HANDLE_NODE_H_
18 
19 #include <memory>
20 #include <utility>
21 
22 #include "tensorflow/core/common_runtime/eager/eager_executor.h"
23 #include "tensorflow/core/distributed_runtime/eager/eager_client.h"
24 #include "tensorflow/core/protobuf/eager_service.pb.h"
25 
26 namespace tensorflow {
27 namespace eager {
28 
29 // DestroyTensorHandleNode is an implementation of EagerNode which enqueues a
30 // request to destroy a remote tensor handle.
31 class DestroyTensorHandleNode : public tensorflow::AsyncEagerNode {
32  public:
DestroyTensorHandleNode(std::unique_ptr<EnqueueRequest> request,core::RefCountPtr<EagerClient> eager_client,bool ready)33   DestroyTensorHandleNode(std::unique_ptr<EnqueueRequest> request,
34                           core::RefCountPtr<EagerClient> eager_client,
35                           bool ready)
36       : tensorflow::AsyncEagerNode(),
37         request_(std::move(request)),
38         eager_client_(std::move(eager_client)),
39         ready_(ready) {}
40 
~DestroyTensorHandleNode()41   ~DestroyTensorHandleNode() override {}
42 
RunAsync(StatusCallback done)43   void RunAsync(StatusCallback done) override {
44     EnqueueResponse* response = new EnqueueResponse;
45     bool ready = ready_;
46     // NOTE(fishx): Don't use StreamingEnqueueAsync here. When a
47     // StreamingEnqueueAsync request fails all following requests will fail as
48     // well. We don't want this request poison following requests since it is
49     // safe to ignore a failing destroy tensor handle request.
50     eager_client_->EnqueueAsync(
51         /*call_opts=*/nullptr, request_.get(), response,
52         [response, ready, done](const tensorflow::Status& s) {
53           // Omit the warning if:
54           // 1. The remote tensor isn't ready.
55           // 2. Lost connection to remote worker. In this case client will
56           //    crash. We don't want to spam user with redundant warning logs.
57           if (!s.ok() && ready && !errors::IsUnavailable(s)) {
58             LOG_EVERY_N_SEC(WARNING, 60)
59                 << "Ignoring an error encountered when deleting "
60                    "remote tensors handles: "
61                 << s.ToString();
62           }
63           done(OkStatus());
64           delete response;
65         });
66   }
67 
Abort(Status status)68   void Abort(Status status) override {}
69 
70   // Remote node deletions are best effort
Fatal()71   bool Fatal() const override { return false; }
72 
DebugString()73   string DebugString() const override {
74     string out = "[DestroyTensorHandleNode]";
75     strings::StrAppend(&out, " request: ", request_->DebugString());
76     return out;
77   }
78 
79  private:
80   std::unique_ptr<EnqueueRequest> request_;
81   core::RefCountPtr<EagerClient> eager_client_;
82   const string remote_task_;
83   bool ready_;
84 };
85 
86 }  // namespace eager
87 }  // namespace tensorflow
88 
89 #endif  // TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_EAGER_DESTROY_TENSOR_HANDLE_NODE_H_
90