1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/kernels/stack.h"
17
18 #include <limits.h>
19 #include <atomic>
20 #include <vector>
21
22 #include "tensorflow/core/common_runtime/device.h"
23 #include "tensorflow/core/framework/device_base.h"
24 #include "tensorflow/core/framework/op_kernel.h"
25 #include "tensorflow/core/framework/register_types.h"
26 #include "tensorflow/core/framework/resource_mgr.h"
27 #include "tensorflow/core/framework/tensor.h"
28 #include "tensorflow/core/framework/tensor_shape.h"
29 #include "tensorflow/core/framework/types.h"
30 #include "tensorflow/core/lib/core/errors.h"
31 #include "tensorflow/core/lib/core/refcount.h"
32 #include "tensorflow/core/lib/gtl/map_util.h"
33 #include "tensorflow/core/platform/logging.h"
34 #include "tensorflow/core/platform/macros.h"
35 #include "tensorflow/core/platform/mutex.h"
36 #include "tensorflow/core/platform/thread_annotations.h"
37 #include "tensorflow/core/platform/types.h"
38
39 namespace tensorflow {
40
41 class Stack : public ResourceBase {
42 public:
43 static std::atomic<int64_t> stack_counter;
44
45 struct TensorAndAllocation {
46 Tensor tensor;
47 AllocatorAttributes alloc_attrs;
48 bool swapped_to_cpu;
49 };
50
Stack(const DataType & elem_type,const string & stack_name,int max_size)51 Stack(const DataType& elem_type, const string& stack_name, int max_size)
52 : elem_type_(elem_type),
53 stack_name_(stack_name),
54 max_size_(max_size),
55 closed_(false) {}
56
Push(const TensorAndAllocation & value)57 Status Push(const TensorAndAllocation& value) {
58 mutex_lock l(mu_);
59 TF_RETURN_IF_ERROR(CheckNotClosed());
60 int stack_size = stack_.size();
61 if (max_size_ >= 0 && stack_size >= max_size_) {
62 return errors::InvalidArgument("Stack[", stack_name_, "] overflowed ",
63 "its max_size (", max_size_, ")");
64 }
65 stack_.push_back(value);
66 return OkStatus();
67 }
68
Pop(TensorAndAllocation * value)69 Status Pop(TensorAndAllocation* value) {
70 mutex_lock l(mu_);
71 TF_RETURN_IF_ERROR(CheckNotClosed());
72 if (stack_.empty()) {
73 return errors::InvalidArgument("Stack[", stack_name_,
74 "] is empty when calling Pop().");
75 }
76 *value = stack_.back();
77 stack_.pop_back();
78 return OkStatus();
79 }
80
81 // We don't swap the first tensor on the stack and any subsequent tensors
82 // that share the buffer with the first tensor.
IsUsefulToSwap(const Tensor & tensor) const83 bool IsUsefulToSwap(const Tensor& tensor) const {
84 mutex_lock l(mu_);
85 if (stack_.empty()) {
86 return false;
87 }
88 const Tensor& first = stack_.front().tensor;
89 return !tensor.SharesBufferWith(first);
90 }
91
Close()92 void Close() {
93 mutex_lock l(mu_);
94 stack_.clear();
95 closed_ = true;
96 }
97
ElemType()98 DataType ElemType() { return elem_type_; }
99
DebugString() const100 string DebugString() const override {
101 mutex_lock l(mu_);
102 return strings::StrCat("Stack[", stack_name_, "]");
103 }
104
stack_name()105 const string& stack_name() { return stack_name_; }
106
107 private:
108 friend class StackOp;
mu()109 mutex* mu() { return &mu_; }
110
111 mutable mutex mu_;
112 DataType elem_type_;
113 const string stack_name_;
114 Tensor handle_;
115 int max_size_;
116 bool closed_ TF_GUARDED_BY(mu_);
117 std::vector<TensorAndAllocation> stack_ TF_GUARDED_BY(mu_);
118
CheckNotClosed() const119 Status CheckNotClosed() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
120 if (closed_) {
121 return errors::InvalidArgument("Stack[", stack_name_,
122 "] has already been closed.");
123 }
124 return OkStatus();
125 }
126 };
127
GetStack(OpKernelContext * ctx,Stack ** stack)128 Status GetStack(OpKernelContext* ctx, Stack** stack) {
129 if (ctx->input_dtype(0) == DT_RESOURCE) {
130 return LookupResource(ctx, HandleFromInput(ctx, 0), stack);
131 } else {
132 Tensor Tstack_handle = ctx->mutable_input(0, false);
133 if (Tstack_handle.NumElements() != 2) {
134 return errors::InvalidArgument(
135 "Stack handle must have two elements, but had shape: ",
136 Tstack_handle.shape().DebugString());
137 }
138 const string& container = Tstack_handle.flat<tstring>()(0);
139 const string& stack_name = Tstack_handle.flat<tstring>()(1);
140 string key = strings::StrCat(container, stack_name);
141 ResourceMgr* rm = ctx->resource_manager();
142 if (rm == nullptr) {
143 return errors::Internal("No resource manager.");
144 }
145 auto* step_container = ctx->step_container();
146 if (step_container == nullptr) {
147 return errors::Internal("No step container.");
148 }
149 TF_RETURN_IF_ERROR(step_container->Lookup(rm, key, stack));
150 return OkStatus();
151 }
152 }
153
154 std::atomic<int64_t> Stack::stack_counter{0};
155
156 // StackOp
157
StackOp(OpKernelConstruction * context)158 StackOp::StackOp(OpKernelConstruction* context) : OpKernel(context) {
159 OP_REQUIRES_OK(context, context->GetAttr("elem_type", &elem_type_));
160 OP_REQUIRES_OK(context, context->GetAttr("stack_name", &stack_name_));
161 if (stack_name_.empty()) stack_name_ = name();
162 }
163
Compute(OpKernelContext * ctx)164 void StackOp::Compute(OpKernelContext* ctx) {
165 int32_t size = std::numeric_limits<int32>::max();
166 if (ctx->num_inputs() > 0) {
167 const Tensor* tensor_size;
168 OP_REQUIRES_OK(ctx, ctx->input("max_size", &tensor_size));
169
170 OP_REQUIRES(
171 ctx, TensorShapeUtils::IsScalar(tensor_size->shape()),
172 errors::InvalidArgument("Stack size must be a scalar, but had shape: ",
173 tensor_size->shape().DebugString()));
174
175 int32_t size_value = tensor_size->scalar<int32>()();
176 if (size_value >= 0) {
177 size = size_value;
178 }
179 }
180
181 static const char kContainer[] = "_stacks";
182 auto stack_id = Stack::stack_counter.fetch_add(1);
183 string stack_name = strings::StrCat(stack_name_, "_", stack_id);
184 // Store the handle in a per-step container.
185 ResourceMgr* rm = ctx->resource_manager();
186 OP_REQUIRES(ctx, rm != nullptr, errors::Internal("No resource manager."));
187 string key = strings::StrCat(kContainer, stack_name);
188 auto* step_container = ctx->step_container();
189 OP_REQUIRES(ctx, step_container != nullptr,
190 errors::Internal("No step container."));
191 Stack* stack = new Stack(elem_type_, stack_name, size);
192 OP_REQUIRES_OK(ctx, step_container->Create(rm, key, stack));
193 if (IsRefType(ctx->expected_output_dtype(0))) {
194 // Create the stack handle.
195 AllocatorAttributes alloc_attr;
196 alloc_attr.set_on_host(true);
197 OP_REQUIRES_OK(ctx, ctx->allocate_temp(tensorflow::DT_STRING,
198 tensorflow::TensorShape({2}),
199 &stack->handle_, alloc_attr));
200 auto handle = stack->handle_.flat<tstring>();
201 handle(0) = kContainer;
202 handle(1) = std::move(stack_name);
203 ctx->set_output_ref(0, stack->mu(), &stack->handle_);
204 } else {
205 Tensor* handle;
206 OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}), &handle));
207 handle->flat<ResourceHandle>()(0) =
208 ctx->step_container()->MakeResourceHandle<Stack>(key, *ctx->device());
209 }
210 }
211
212 // StackPushOp
213
StackPushOp(OpKernelConstruction * context,bool allow_swapping)214 StackPushOp::StackPushOp(OpKernelConstruction* context, bool allow_swapping)
215 : AsyncOpKernel(context) {
216 if (allow_swapping) {
217 OP_REQUIRES_OK(context, context->GetAttr("swap_memory", &swap_memory_));
218 }
219 }
220
ComputeAsync(OpKernelContext * ctx,DoneCallback done)221 void StackPushOp::ComputeAsync(OpKernelContext* ctx, DoneCallback done) {
222 // Get the stack from the handle.
223 Stack* stack = nullptr;
224 OP_REQUIRES_OK_ASYNC(ctx, GetStack(ctx, &stack), done);
225 core::ScopedUnref unref(stack);
226
227 if (ctx->input_dtype(1) != stack->ElemType()) {
228 ctx->CtxFailure(errors::InvalidArgument("Must have type ",
229 stack->ElemType(), " but got ",
230 ctx->input_dtype(1)));
231 done();
232 return;
233 }
234
235 // Push the tensor onto the stack. Swap the tensor to CPU if instructed.
236 const Tensor& tensor = ctx->input(1);
237 AllocatorAttributes alloc_attrs = ctx->input_alloc_attr(1);
238 // For now, we use a simple heuristic for swapping: A GPU tensor is moved
239 // to CPU if the tensor has more than kCopyThreshold bytes and the GPU
240 // allocator says more than kOccupancy of the memory is in use.
241 static constexpr int kCopyThreshold = 2048;
242 static constexpr double kOccupancy = 0.7;
243 if (swap_memory_ && !alloc_attrs.on_host() &&
244 tensor.TotalBytes() > kCopyThreshold && stack->IsUsefulToSwap(tensor)) {
245 DeviceContext* device_ctxt = ctx->op_device_context();
246 auto device = static_cast<tensorflow::Device*>(ctx->device());
247 Allocator* allocator = device->GetAllocator(alloc_attrs);
248 absl::optional<AllocatorStats> stats = allocator->GetStats();
249 if (stats && *stats->bytes_limit &&
250 stats->bytes_in_use > (*stats->bytes_limit * kOccupancy)) {
251 // Asynchronously copy the tensor from GPU to CPU memory.
252 // TODO(yuanbyu): Swap the oldest tensor first.
253 AllocatorAttributes host_alloc_attrs;
254 host_alloc_attrs.set_gpu_compatible(true);
255 host_alloc_attrs.set_on_host(true);
256 Allocator* cpu_allocator = device->GetAllocator(host_alloc_attrs);
257 Tensor* cpu_tensor =
258 new Tensor(cpu_allocator, tensor.dtype(), tensor.shape());
259 device_ctxt->CopyDeviceTensorToCPU(
260 &tensor, "StackPush", device, cpu_tensor,
261 [cpu_tensor, stack, ctx, done](const Status& s) {
262 ctx->SetStatus(s);
263 if (s.ok()) {
264 AllocatorAttributes alloc_attrs = ctx->input_alloc_attr(1);
265 ctx->SetStatus(stack->Push({*cpu_tensor, alloc_attrs, true}));
266 }
267 if (ctx->status().ok()) {
268 ctx->set_output(0, *cpu_tensor);
269 }
270 done();
271 delete cpu_tensor;
272 });
273 return;
274 }
275 }
276
277 // Execute synchronously if not swapped.
278 OP_REQUIRES_OK_ASYNC(ctx, stack->Push({tensor, alloc_attrs, false}), done);
279 ctx->set_output(0, tensor);
280 done();
281 }
282
IsExpensive()283 bool StackPushOp::IsExpensive() { return false; }
284
285 // StackPopOp
286
StackPopOp(OpKernelConstruction * context)287 StackPopOp::StackPopOp(OpKernelConstruction* context)
288 : AsyncOpKernel(context) {}
289
ComputeAsync(OpKernelContext * ctx,DoneCallback done)290 void StackPopOp::ComputeAsync(OpKernelContext* ctx, DoneCallback done) {
291 // Get the stack from the handle.
292 Stack* stack = nullptr;
293 OP_REQUIRES_OK_ASYNC(ctx, GetStack(ctx, &stack), done);
294 core::ScopedUnref unref(stack);
295
296 // Pop the tensor. Transfer the tensor back to device if it was
297 // swapped out to CPU.
298 Stack::TensorAndAllocation value;
299 OP_REQUIRES_OK_ASYNC(ctx, stack->Pop(&value), done);
300 if (value.swapped_to_cpu) {
301 // Asynchronously copy the tensor back from CPU to GPU memory.
302 DeviceContext* device_ctxt = ctx->op_device_context();
303 Device* device = static_cast<Device*>(ctx->device());
304 Tensor* cpu_tensor = &value.tensor;
305 Allocator* gpu_allocator = device->GetAllocator(value.alloc_attrs);
306 Tensor* device_tensor =
307 new Tensor(gpu_allocator, cpu_tensor->dtype(), cpu_tensor->shape());
308 device_ctxt->CopyCPUTensorToDevice(
309 cpu_tensor, device, device_tensor,
310 [device_tensor, ctx, done](const Status& s) {
311 ctx->SetStatus(s);
312 if (s.ok()) {
313 ctx->set_output(0, *device_tensor);
314 }
315 done();
316 delete device_tensor;
317 });
318 } else {
319 // Execute synchronously if not swapped.
320 ctx->set_output(0, value.tensor);
321 done();
322 }
323 }
324
IsExpensive()325 bool StackPopOp::IsExpensive() { return false; }
326
327 // StackCloseOp
328
StackCloseOp(OpKernelConstruction * context)329 StackCloseOp::StackCloseOp(OpKernelConstruction* context) : OpKernel(context) {}
330
Compute(OpKernelContext * ctx)331 void StackCloseOp::Compute(OpKernelContext* ctx) {
332 Stack* stack = nullptr;
333 OP_REQUIRES_OK(ctx, GetStack(ctx, &stack));
334 core::ScopedUnref unref(stack);
335 stack->Close();
336 }
337
IsExpensive()338 bool StackCloseOp::IsExpensive() { return false; }
339
340 } // namespace tensorflow
341