xref: /aosp_15_r20/external/tensorflow/tensorflow/core/kernels/stack.cc (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/kernels/stack.h"
17 
18 #include <limits.h>
19 #include <atomic>
20 #include <vector>
21 
22 #include "tensorflow/core/common_runtime/device.h"
23 #include "tensorflow/core/framework/device_base.h"
24 #include "tensorflow/core/framework/op_kernel.h"
25 #include "tensorflow/core/framework/register_types.h"
26 #include "tensorflow/core/framework/resource_mgr.h"
27 #include "tensorflow/core/framework/tensor.h"
28 #include "tensorflow/core/framework/tensor_shape.h"
29 #include "tensorflow/core/framework/types.h"
30 #include "tensorflow/core/lib/core/errors.h"
31 #include "tensorflow/core/lib/core/refcount.h"
32 #include "tensorflow/core/lib/gtl/map_util.h"
33 #include "tensorflow/core/platform/logging.h"
34 #include "tensorflow/core/platform/macros.h"
35 #include "tensorflow/core/platform/mutex.h"
36 #include "tensorflow/core/platform/thread_annotations.h"
37 #include "tensorflow/core/platform/types.h"
38 
39 namespace tensorflow {
40 
41 class Stack : public ResourceBase {
42  public:
43   static std::atomic<int64_t> stack_counter;
44 
45   struct TensorAndAllocation {
46     Tensor tensor;
47     AllocatorAttributes alloc_attrs;
48     bool swapped_to_cpu;
49   };
50 
Stack(const DataType & elem_type,const string & stack_name,int max_size)51   Stack(const DataType& elem_type, const string& stack_name, int max_size)
52       : elem_type_(elem_type),
53         stack_name_(stack_name),
54         max_size_(max_size),
55         closed_(false) {}
56 
Push(const TensorAndAllocation & value)57   Status Push(const TensorAndAllocation& value) {
58     mutex_lock l(mu_);
59     TF_RETURN_IF_ERROR(CheckNotClosed());
60     int stack_size = stack_.size();
61     if (max_size_ >= 0 && stack_size >= max_size_) {
62       return errors::InvalidArgument("Stack[", stack_name_, "] overflowed ",
63                                      "its max_size (", max_size_, ")");
64     }
65     stack_.push_back(value);
66     return OkStatus();
67   }
68 
Pop(TensorAndAllocation * value)69   Status Pop(TensorAndAllocation* value) {
70     mutex_lock l(mu_);
71     TF_RETURN_IF_ERROR(CheckNotClosed());
72     if (stack_.empty()) {
73       return errors::InvalidArgument("Stack[", stack_name_,
74                                      "] is empty when calling Pop().");
75     }
76     *value = stack_.back();
77     stack_.pop_back();
78     return OkStatus();
79   }
80 
81   // We don't swap the first tensor on the stack and any subsequent tensors
82   // that share the buffer with the first tensor.
IsUsefulToSwap(const Tensor & tensor) const83   bool IsUsefulToSwap(const Tensor& tensor) const {
84     mutex_lock l(mu_);
85     if (stack_.empty()) {
86       return false;
87     }
88     const Tensor& first = stack_.front().tensor;
89     return !tensor.SharesBufferWith(first);
90   }
91 
Close()92   void Close() {
93     mutex_lock l(mu_);
94     stack_.clear();
95     closed_ = true;
96   }
97 
ElemType()98   DataType ElemType() { return elem_type_; }
99 
DebugString() const100   string DebugString() const override {
101     mutex_lock l(mu_);
102     return strings::StrCat("Stack[", stack_name_, "]");
103   }
104 
stack_name()105   const string& stack_name() { return stack_name_; }
106 
107  private:
108   friend class StackOp;
mu()109   mutex* mu() { return &mu_; }
110 
111   mutable mutex mu_;
112   DataType elem_type_;
113   const string stack_name_;
114   Tensor handle_;
115   int max_size_;
116   bool closed_ TF_GUARDED_BY(mu_);
117   std::vector<TensorAndAllocation> stack_ TF_GUARDED_BY(mu_);
118 
CheckNotClosed() const119   Status CheckNotClosed() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
120     if (closed_) {
121       return errors::InvalidArgument("Stack[", stack_name_,
122                                      "] has already been closed.");
123     }
124     return OkStatus();
125   }
126 };
127 
GetStack(OpKernelContext * ctx,Stack ** stack)128 Status GetStack(OpKernelContext* ctx, Stack** stack) {
129   if (ctx->input_dtype(0) == DT_RESOURCE) {
130     return LookupResource(ctx, HandleFromInput(ctx, 0), stack);
131   } else {
132     Tensor Tstack_handle = ctx->mutable_input(0, false);
133     if (Tstack_handle.NumElements() != 2) {
134       return errors::InvalidArgument(
135           "Stack handle must have two elements, but had shape: ",
136           Tstack_handle.shape().DebugString());
137     }
138     const string& container = Tstack_handle.flat<tstring>()(0);
139     const string& stack_name = Tstack_handle.flat<tstring>()(1);
140     string key = strings::StrCat(container, stack_name);
141     ResourceMgr* rm = ctx->resource_manager();
142     if (rm == nullptr) {
143       return errors::Internal("No resource manager.");
144     }
145     auto* step_container = ctx->step_container();
146     if (step_container == nullptr) {
147       return errors::Internal("No step container.");
148     }
149     TF_RETURN_IF_ERROR(step_container->Lookup(rm, key, stack));
150     return OkStatus();
151   }
152 }
153 
154 std::atomic<int64_t> Stack::stack_counter{0};
155 
156 // StackOp
157 
StackOp(OpKernelConstruction * context)158 StackOp::StackOp(OpKernelConstruction* context) : OpKernel(context) {
159   OP_REQUIRES_OK(context, context->GetAttr("elem_type", &elem_type_));
160   OP_REQUIRES_OK(context, context->GetAttr("stack_name", &stack_name_));
161   if (stack_name_.empty()) stack_name_ = name();
162 }
163 
Compute(OpKernelContext * ctx)164 void StackOp::Compute(OpKernelContext* ctx) {
165   int32_t size = std::numeric_limits<int32>::max();
166   if (ctx->num_inputs() > 0) {
167     const Tensor* tensor_size;
168     OP_REQUIRES_OK(ctx, ctx->input("max_size", &tensor_size));
169 
170     OP_REQUIRES(
171         ctx, TensorShapeUtils::IsScalar(tensor_size->shape()),
172         errors::InvalidArgument("Stack size must be a scalar, but had shape: ",
173                                 tensor_size->shape().DebugString()));
174 
175     int32_t size_value = tensor_size->scalar<int32>()();
176     if (size_value >= 0) {
177       size = size_value;
178     }
179   }
180 
181   static const char kContainer[] = "_stacks";
182   auto stack_id = Stack::stack_counter.fetch_add(1);
183   string stack_name = strings::StrCat(stack_name_, "_", stack_id);
184   // Store the handle in a per-step container.
185   ResourceMgr* rm = ctx->resource_manager();
186   OP_REQUIRES(ctx, rm != nullptr, errors::Internal("No resource manager."));
187   string key = strings::StrCat(kContainer, stack_name);
188   auto* step_container = ctx->step_container();
189   OP_REQUIRES(ctx, step_container != nullptr,
190               errors::Internal("No step container."));
191   Stack* stack = new Stack(elem_type_, stack_name, size);
192   OP_REQUIRES_OK(ctx, step_container->Create(rm, key, stack));
193   if (IsRefType(ctx->expected_output_dtype(0))) {
194     // Create the stack handle.
195     AllocatorAttributes alloc_attr;
196     alloc_attr.set_on_host(true);
197     OP_REQUIRES_OK(ctx, ctx->allocate_temp(tensorflow::DT_STRING,
198                                            tensorflow::TensorShape({2}),
199                                            &stack->handle_, alloc_attr));
200     auto handle = stack->handle_.flat<tstring>();
201     handle(0) = kContainer;
202     handle(1) = std::move(stack_name);
203     ctx->set_output_ref(0, stack->mu(), &stack->handle_);
204   } else {
205     Tensor* handle;
206     OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}), &handle));
207     handle->flat<ResourceHandle>()(0) =
208         ctx->step_container()->MakeResourceHandle<Stack>(key, *ctx->device());
209   }
210 }
211 
212 // StackPushOp
213 
StackPushOp(OpKernelConstruction * context,bool allow_swapping)214 StackPushOp::StackPushOp(OpKernelConstruction* context, bool allow_swapping)
215     : AsyncOpKernel(context) {
216   if (allow_swapping) {
217     OP_REQUIRES_OK(context, context->GetAttr("swap_memory", &swap_memory_));
218   }
219 }
220 
ComputeAsync(OpKernelContext * ctx,DoneCallback done)221 void StackPushOp::ComputeAsync(OpKernelContext* ctx, DoneCallback done) {
222   // Get the stack from the handle.
223   Stack* stack = nullptr;
224   OP_REQUIRES_OK_ASYNC(ctx, GetStack(ctx, &stack), done);
225   core::ScopedUnref unref(stack);
226 
227   if (ctx->input_dtype(1) != stack->ElemType()) {
228     ctx->CtxFailure(errors::InvalidArgument("Must have type ",
229                                             stack->ElemType(), " but got ",
230                                             ctx->input_dtype(1)));
231     done();
232     return;
233   }
234 
235   // Push the tensor onto the stack. Swap the tensor to CPU if instructed.
236   const Tensor& tensor = ctx->input(1);
237   AllocatorAttributes alloc_attrs = ctx->input_alloc_attr(1);
238   // For now, we use a simple heuristic for swapping: A GPU tensor is moved
239   // to CPU if the tensor has more than kCopyThreshold bytes and the GPU
240   // allocator says more than kOccupancy of the memory is in use.
241   static constexpr int kCopyThreshold = 2048;
242   static constexpr double kOccupancy = 0.7;
243   if (swap_memory_ && !alloc_attrs.on_host() &&
244       tensor.TotalBytes() > kCopyThreshold && stack->IsUsefulToSwap(tensor)) {
245     DeviceContext* device_ctxt = ctx->op_device_context();
246     auto device = static_cast<tensorflow::Device*>(ctx->device());
247     Allocator* allocator = device->GetAllocator(alloc_attrs);
248     absl::optional<AllocatorStats> stats = allocator->GetStats();
249     if (stats && *stats->bytes_limit &&
250         stats->bytes_in_use > (*stats->bytes_limit * kOccupancy)) {
251       // Asynchronously copy the tensor from GPU to CPU memory.
252       // TODO(yuanbyu): Swap the oldest tensor first.
253       AllocatorAttributes host_alloc_attrs;
254       host_alloc_attrs.set_gpu_compatible(true);
255       host_alloc_attrs.set_on_host(true);
256       Allocator* cpu_allocator = device->GetAllocator(host_alloc_attrs);
257       Tensor* cpu_tensor =
258           new Tensor(cpu_allocator, tensor.dtype(), tensor.shape());
259       device_ctxt->CopyDeviceTensorToCPU(
260           &tensor, "StackPush", device, cpu_tensor,
261           [cpu_tensor, stack, ctx, done](const Status& s) {
262             ctx->SetStatus(s);
263             if (s.ok()) {
264               AllocatorAttributes alloc_attrs = ctx->input_alloc_attr(1);
265               ctx->SetStatus(stack->Push({*cpu_tensor, alloc_attrs, true}));
266             }
267             if (ctx->status().ok()) {
268               ctx->set_output(0, *cpu_tensor);
269             }
270             done();
271             delete cpu_tensor;
272           });
273       return;
274     }
275   }
276 
277   // Execute synchronously if not swapped.
278   OP_REQUIRES_OK_ASYNC(ctx, stack->Push({tensor, alloc_attrs, false}), done);
279   ctx->set_output(0, tensor);
280   done();
281 }
282 
IsExpensive()283 bool StackPushOp::IsExpensive() { return false; }
284 
285 // StackPopOp
286 
StackPopOp(OpKernelConstruction * context)287 StackPopOp::StackPopOp(OpKernelConstruction* context)
288     : AsyncOpKernel(context) {}
289 
ComputeAsync(OpKernelContext * ctx,DoneCallback done)290 void StackPopOp::ComputeAsync(OpKernelContext* ctx, DoneCallback done) {
291   // Get the stack from the handle.
292   Stack* stack = nullptr;
293   OP_REQUIRES_OK_ASYNC(ctx, GetStack(ctx, &stack), done);
294   core::ScopedUnref unref(stack);
295 
296   // Pop the tensor. Transfer the tensor back to device if it was
297   // swapped out to CPU.
298   Stack::TensorAndAllocation value;
299   OP_REQUIRES_OK_ASYNC(ctx, stack->Pop(&value), done);
300   if (value.swapped_to_cpu) {
301     // Asynchronously copy the tensor back from CPU to GPU memory.
302     DeviceContext* device_ctxt = ctx->op_device_context();
303     Device* device = static_cast<Device*>(ctx->device());
304     Tensor* cpu_tensor = &value.tensor;
305     Allocator* gpu_allocator = device->GetAllocator(value.alloc_attrs);
306     Tensor* device_tensor =
307         new Tensor(gpu_allocator, cpu_tensor->dtype(), cpu_tensor->shape());
308     device_ctxt->CopyCPUTensorToDevice(
309         cpu_tensor, device, device_tensor,
310         [device_tensor, ctx, done](const Status& s) {
311           ctx->SetStatus(s);
312           if (s.ok()) {
313             ctx->set_output(0, *device_tensor);
314           }
315           done();
316           delete device_tensor;
317         });
318   } else {
319     // Execute synchronously if not swapped.
320     ctx->set_output(0, value.tensor);
321     done();
322   }
323 }
324 
IsExpensive()325 bool StackPopOp::IsExpensive() { return false; }
326 
327 // StackCloseOp
328 
StackCloseOp(OpKernelConstruction * context)329 StackCloseOp::StackCloseOp(OpKernelConstruction* context) : OpKernel(context) {}
330 
Compute(OpKernelContext * ctx)331 void StackCloseOp::Compute(OpKernelContext* ctx) {
332   Stack* stack = nullptr;
333   OP_REQUIRES_OK(ctx, GetStack(ctx, &stack));
334   core::ScopedUnref unref(stack);
335   stack->Close();
336 }
337 
IsExpensive()338 bool StackCloseOp::IsExpensive() { return false; }
339 
340 }  // namespace tensorflow
341