1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 // See docs in ../ops/math_ops.cc. 17 18 #include "tensorflow/core/platform/errors.h" 19 #define EIGEN_USE_THREADS 20 21 #include "tensorflow/core/framework/op_kernel.h" 22 #include "tensorflow/core/framework/register_types.h" 23 #include "tensorflow/core/framework/types.h" 24 #include "tensorflow/core/kernels/bincount_op.h" 25 #include "tensorflow/core/kernels/fill_functor.h" 26 #include "tensorflow/core/kernels/sparse_utils.h" 27 #include "tensorflow/core/lib/core/threadpool.h" 28 #include "tensorflow/core/platform/types.h" 29 #include "tensorflow/core/util/determinism.h" 30 31 namespace tensorflow { 32 33 using thread::ThreadPool; 34 35 typedef Eigen::ThreadPoolDevice CPUDevice; 36 typedef Eigen::GpuDevice GPUDevice; 37 38 namespace functor { 39 40 template <typename Tidx, typename T> 41 struct BincountFunctor<CPUDevice, Tidx, T, true> { Computetensorflow::functor::BincountFunctor42 static Status Compute(OpKernelContext* context, 43 const typename TTypes<Tidx, 1>::ConstTensor& arr, 44 const typename TTypes<T, 1>::ConstTensor& weights, 45 typename TTypes<T, 1>::Tensor& output, 46 const Tidx num_bins) { 47 Tensor all_nonneg_t; 48 TF_RETURN_IF_ERROR(context->allocate_temp( 49 DT_BOOL, TensorShape({}), &all_nonneg_t, AllocatorAttributes())); 50 all_nonneg_t.scalar<bool>().device(context->eigen_cpu_device()) = 51 (arr >= Tidx(0)).all(); 52 if (!all_nonneg_t.scalar<bool>()()) { 53 return errors::InvalidArgument("Input arr must be non-negative!"); 54 } 55 56 // Allocate partial output bin sums for each worker thread. Worker ids in 57 // ParallelForWithWorkerId range from 0 to NumThreads() inclusive. 58 ThreadPool* thread_pool = 59 context->device()->tensorflow_cpu_worker_threads()->workers; 60 const int64_t num_threads = thread_pool->NumThreads() + 1; 61 Tensor partial_bins_t; 62 TF_RETURN_IF_ERROR(context->allocate_temp( 63 DT_BOOL, TensorShape({num_threads, num_bins}), &partial_bins_t)); 64 auto partial_bins = partial_bins_t.matrix<bool>(); 65 partial_bins.setZero(); 66 thread_pool->ParallelForWithWorkerId( 67 arr.size(), 8 /* cost */, 68 [&](int64_t start_ind, int64_t limit_ind, int64_t worker_id) { 69 for (int64_t i = start_ind; i < limit_ind; i++) { 70 Tidx value = arr(i); 71 if (value < num_bins) { 72 partial_bins(worker_id, value) = true; 73 } 74 } 75 }); 76 77 // Sum the partial bins along the 0th axis. 78 Eigen::array<int, 1> reduce_dim({0}); 79 output.device(context->eigen_cpu_device()) = 80 partial_bins.any(reduce_dim).cast<T>(); 81 return OkStatus(); 82 } 83 }; 84 85 template <typename Tidx, typename T> 86 struct BincountFunctor<CPUDevice, Tidx, T, false> { Computetensorflow::functor::BincountFunctor87 static Status Compute(OpKernelContext* context, 88 const typename TTypes<Tidx, 1>::ConstTensor& arr, 89 const typename TTypes<T, 1>::ConstTensor& weights, 90 typename TTypes<T, 1>::Tensor& output, 91 const Tidx num_bins) { 92 Tensor all_nonneg_t; 93 TF_RETURN_IF_ERROR(context->allocate_temp( 94 DT_BOOL, TensorShape({}), &all_nonneg_t, AllocatorAttributes())); 95 all_nonneg_t.scalar<bool>().device(context->eigen_cpu_device()) = 96 (arr >= Tidx(0)).all(); 97 if (!all_nonneg_t.scalar<bool>()()) { 98 return errors::InvalidArgument("Input arr must be non-negative!"); 99 } 100 101 // Allocate partial output bin sums for each worker thread. Worker ids in 102 // ParallelForWithWorkerId range from 0 to NumThreads() inclusive. 103 ThreadPool* thread_pool = 104 context->device()->tensorflow_cpu_worker_threads()->workers; 105 const int64_t num_threads = thread_pool->NumThreads() + 1; 106 const Tidx* arr_data = arr.data(); 107 const std::ptrdiff_t arr_size = arr.size(); 108 const T* weight_data = weights.data(); 109 if (weights.size() && weights.size() != arr_size) { 110 return errors::InvalidArgument( 111 "Input indices and weights must have the same size."); 112 } 113 if (num_threads == 1) { 114 output.setZero(); 115 T* output_data = output.data(); 116 if (weights.size()) { 117 for (int64_t i = 0; i < arr_size; i++) { 118 const Tidx value = arr_data[i]; 119 if (value < num_bins) { 120 output_data[value] += weight_data[i]; 121 } 122 } 123 } else { 124 for (int64_t i = 0; i < arr_size; i++) { 125 const Tidx value = arr_data[i]; 126 if (value < num_bins) { 127 // Complex numbers don't support "++". 128 output_data[value] += T(1); 129 } 130 } 131 } 132 } else { 133 Tensor partial_bins_t; 134 TF_RETURN_IF_ERROR(context->allocate_temp( 135 DataTypeToEnum<T>::value, TensorShape({num_threads, num_bins}), 136 &partial_bins_t)); 137 auto partial_bins = partial_bins_t.matrix<T>(); 138 partial_bins.setZero(); 139 thread_pool->ParallelForWithWorkerId( 140 arr_size, 8 /* cost */, 141 [&](int64_t start_ind, int64_t limit_ind, int64_t worker_id) { 142 if (weights.size()) { 143 for (int64_t i = start_ind; i < limit_ind; i++) { 144 Tidx value = arr_data[i]; 145 if (value < num_bins) { 146 partial_bins(worker_id, value) += weight_data[i]; 147 } 148 } 149 } else { 150 for (int64_t i = start_ind; i < limit_ind; i++) { 151 Tidx value = arr_data[i]; 152 if (value < num_bins) { 153 // Complex numbers don't support "++". 154 partial_bins(worker_id, value) += T(1); 155 } 156 } 157 } 158 }); 159 160 // Sum the partial bins along the 0th axis. 161 Eigen::array<int, 1> reduce_dim({0}); 162 output.device(context->eigen_cpu_device()) = partial_bins.sum(reduce_dim); 163 } 164 return OkStatus(); 165 } 166 }; 167 168 template <typename Tidx, typename T, bool binary_output> 169 struct BincountReduceFunctor<CPUDevice, Tidx, T, binary_output> { Computetensorflow::functor::BincountReduceFunctor170 static Status Compute(OpKernelContext* context, 171 const typename TTypes<Tidx, 2>::ConstTensor& in, 172 const typename TTypes<T, 2>::ConstTensor& weights, 173 typename TTypes<T, 2>::Tensor& out, 174 const Tidx num_bins) { 175 const int num_rows = out.dimension(0); 176 const int num_cols = in.dimension(1); 177 ThreadPool* thread_pool = 178 context->device()->tensorflow_cpu_worker_threads()->workers; 179 thread_pool->ParallelForWithWorkerId( 180 num_rows, 8 /* cost */, 181 [&](int64_t start_row, int64_t end_row, int64_t worker_id) { 182 for (int64_t i = start_row; i < end_row; ++i) { 183 for (int64_t j = 0; j < num_cols; ++j) { 184 Tidx value = in(i, j); 185 if (value < num_bins) { 186 if (binary_output) { 187 out(i, value) = T(1); 188 } else { 189 if (weights.size()) { 190 out(i, value) += weights(i, j); 191 } else { 192 out(i, value) += T(1); 193 } 194 } 195 } 196 } 197 } 198 }); 199 return OkStatus(); 200 } 201 }; 202 203 } // namespace functor 204 205 template <typename Device, typename T> 206 class BincountOp : public OpKernel { 207 public: BincountOp(OpKernelConstruction * ctx)208 explicit BincountOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} 209 Compute(OpKernelContext * ctx)210 void Compute(OpKernelContext* ctx) override { 211 const Tensor& arr_t = ctx->input(0); 212 const Tensor& size_tensor = ctx->input(1); 213 OP_REQUIRES(ctx, size_tensor.dims() == 0, 214 errors::InvalidArgument("Shape must be rank 0 but is rank ", 215 size_tensor.dims())); 216 int32_t size = size_tensor.scalar<int32_t>()(); 217 OP_REQUIRES( 218 ctx, size >= 0, 219 errors::InvalidArgument("size (", size, ") must be non-negative")); 220 221 const Tensor& weights_t = ctx->input(2); 222 const auto arr = arr_t.flat<int32_t>(); 223 const auto weights = weights_t.flat<T>(); 224 Tensor* output_t; 225 OP_REQUIRES_OK(ctx, 226 ctx->allocate_output(0, TensorShape({size}), &output_t)); 227 auto output = output_t->flat<T>(); 228 OP_REQUIRES_OK(ctx, 229 functor::BincountFunctor<Device, int32_t, T, false>::Compute( 230 ctx, arr, weights, output, size)); 231 } 232 }; 233 234 #define REGISTER_KERNELS(type) \ 235 REGISTER_KERNEL_BUILDER( \ 236 Name("Bincount").Device(DEVICE_CPU).TypeConstraint<type>("T"), \ 237 BincountOp<CPUDevice, type>) 238 239 TF_CALL_NUMBER_TYPES(REGISTER_KERNELS); 240 #undef REGISTER_KERNELS 241 242 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM 243 244 #define REGISTER_KERNELS(type) \ 245 REGISTER_KERNEL_BUILDER(Name("Bincount") \ 246 .Device(DEVICE_GPU) \ 247 .HostMemory("size") \ 248 .TypeConstraint<type>("T"), \ 249 BincountOp<GPUDevice, type>) 250 251 TF_CALL_int32(REGISTER_KERNELS); 252 TF_CALL_float(REGISTER_KERNELS); 253 #undef REGISTER_KERNELS 254 255 #endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM 256 257 template <typename Device, typename Tidx, typename T> 258 class DenseBincountOp : public OpKernel { 259 public: DenseBincountOp(OpKernelConstruction * ctx)260 explicit DenseBincountOp(OpKernelConstruction* ctx) : OpKernel(ctx) { 261 OP_REQUIRES_OK(ctx, ctx->GetAttr("binary_output", &binary_output_)); 262 if (std::is_same<Device, GPUDevice>::value) { 263 OP_REQUIRES( 264 ctx, !OpDeterminismRequired(), 265 errors::Unimplemented( 266 "Determinism is not yet supported in GPU implementation of " 267 "DenseBincount.")); 268 } 269 } 270 Compute(OpKernelContext * ctx)271 void Compute(OpKernelContext* ctx) override { 272 const Tensor& data = ctx->input(0); 273 OP_REQUIRES(ctx, data.dims() <= 2, 274 errors::InvalidArgument( 275 "Shape must be at most rank 2 but is rank ", data.dims())); 276 277 const Tensor& size_t = ctx->input(1); 278 const Tensor& weights = ctx->input(2); 279 280 OP_REQUIRES(ctx, size_t.dims() == 0, 281 errors::InvalidArgument("Shape must be rank 0 but is rank ", 282 size_t.dims())); 283 OP_REQUIRES(ctx, 284 weights.shape() == data.shape() || weights.NumElements() == 0, 285 errors::InvalidArgument( 286 "`weights` must be the same shape as `arr` or a length-0 " 287 "`Tensor`, in which case it acts as all weights equal to " 288 "1. Received ", 289 weights.shape().DebugString())); 290 291 Tidx size = size_t.scalar<Tidx>()(); 292 OP_REQUIRES( 293 ctx, size >= 0, 294 errors::InvalidArgument("size (", size, ") must be non-negative")); 295 296 Tensor* out_t; 297 functor::SetZeroFunctor<Device, T> fill; 298 if (data.dims() == 1) { 299 OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({size}), &out_t)); 300 auto out = out_t->flat<T>(); 301 fill(ctx->eigen_device<Device>(), out); 302 if (binary_output_) { 303 OP_REQUIRES_OK( 304 ctx, functor::BincountFunctor<Device, Tidx, T, true>::Compute( 305 ctx, data.flat<Tidx>(), weights.flat<T>(), out, size)); 306 } else { 307 OP_REQUIRES_OK( 308 ctx, functor::BincountFunctor<Device, Tidx, T, false>::Compute( 309 ctx, data.flat<Tidx>(), weights.flat<T>(), out, size)); 310 } 311 } else if (data.dims() == 2) { 312 const int64_t num_rows = data.dim_size(0); 313 auto weight_matrix = 314 (weights.NumElements() == 0) 315 ? weights.shaped<T, 2>(gtl::InlinedVector<int64_t, 2>(2, 0)) 316 : weights.matrix<T>(); 317 OP_REQUIRES_OK( 318 ctx, ctx->allocate_output(0, TensorShape({num_rows, size}), &out_t)); 319 auto out = out_t->matrix<T>(); 320 fill(ctx->eigen_device<Device>(), out_t->flat<T>()); 321 if (binary_output_) { 322 OP_REQUIRES_OK( 323 ctx, functor::BincountReduceFunctor<Device, Tidx, T, true>::Compute( 324 ctx, data.matrix<Tidx>(), weight_matrix, out, size)); 325 } else { 326 OP_REQUIRES_OK( 327 ctx, 328 functor::BincountReduceFunctor<Device, Tidx, T, false>::Compute( 329 ctx, data.matrix<Tidx>(), weight_matrix, out, size)); 330 } 331 } 332 } 333 334 private: 335 bool binary_output_; 336 }; 337 338 #define REGISTER_KERNELS(Tidx, T) \ 339 REGISTER_KERNEL_BUILDER(Name("DenseBincount") \ 340 .Device(DEVICE_CPU) \ 341 .TypeConstraint<T>("T") \ 342 .TypeConstraint<Tidx>("Tidx"), \ 343 DenseBincountOp<CPUDevice, Tidx, T>); 344 #define REGISTER_CPU_KERNELS(T) \ 345 REGISTER_KERNELS(int32, T); \ 346 REGISTER_KERNELS(int64_t, T); 347 348 TF_CALL_NUMBER_TYPES(REGISTER_CPU_KERNELS); 349 #undef REGISTER_CPU_KERNELS 350 #undef REGISTER_KERNELS 351 352 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM 353 354 #define REGISTER_KERNELS(Tidx, T) \ 355 REGISTER_KERNEL_BUILDER(Name("DenseBincount") \ 356 .Device(DEVICE_GPU) \ 357 .HostMemory("size") \ 358 .TypeConstraint<T>("T") \ 359 .TypeConstraint<Tidx>("Tidx"), \ 360 DenseBincountOp<GPUDevice, Tidx, T>); 361 #define REGISTER_GPU_KERNELS(T) \ 362 REGISTER_KERNELS(int32, T); \ 363 REGISTER_KERNELS(int64_t, T); 364 365 TF_CALL_int32(REGISTER_GPU_KERNELS); 366 TF_CALL_float(REGISTER_GPU_KERNELS); 367 #undef REGISTER_GPU_KERNELS 368 #undef REGISTER_KERNELS 369 370 #endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM 371 372 template <typename Device, typename Tidx, typename T> 373 class SparseBincountOp : public OpKernel { 374 public: SparseBincountOp(OpKernelConstruction * ctx)375 explicit SparseBincountOp(OpKernelConstruction* ctx) : OpKernel(ctx) { 376 OP_REQUIRES_OK(ctx, ctx->GetAttr("binary_output", &binary_output_)); 377 } 378 Compute(OpKernelContext * ctx)379 void Compute(OpKernelContext* ctx) override { 380 const Tensor& indices = ctx->input(0); 381 const Tensor& values = ctx->input(1); 382 const auto values_flat = values.flat<Tidx>(); 383 const Tensor& dense_shape = ctx->input(2); 384 const Tensor& size_t = ctx->input(3); 385 const auto weights = ctx->input(4).flat<T>(); 386 const int64_t weights_size = weights.size(); 387 388 OP_REQUIRES(ctx, size_t.dims() == 0, 389 errors::InvalidArgument("Shape must be rank 0 but is rank ", 390 size_t.dims())); 391 Tidx size = size_t.scalar<Tidx>()(); 392 OP_REQUIRES( 393 ctx, size >= 0, 394 errors::InvalidArgument("size (", size, ") must be non-negative")); 395 OP_REQUIRES_OK(ctx, sparse_utils::ValidateSparseTensor<int64_t>( 396 indices, values, dense_shape, 397 sparse_utils::IndexValidation::kUnordered)); 398 399 bool is_1d = dense_shape.NumElements() == 1; 400 401 Tensor* out_t; 402 functor::SetZeroFunctor<Device, T> fill; 403 if (is_1d) { 404 OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({size}), &out_t)); 405 auto out = out_t->flat<T>(); 406 fill(ctx->eigen_device<Device>(), out); 407 if (binary_output_) { 408 OP_REQUIRES_OK(ctx, 409 functor::BincountFunctor<Device, Tidx, T, true>::Compute( 410 ctx, values_flat, weights, out, size)); 411 } else { 412 OP_REQUIRES_OK( 413 ctx, functor::BincountFunctor<Device, Tidx, T, false>::Compute( 414 ctx, values_flat, weights, out, size)); 415 } 416 } else { 417 const auto shape = dense_shape.flat<int64_t>(); 418 const int64_t num_rows = shape(0); 419 OP_REQUIRES_OK( 420 ctx, ctx->allocate_output(0, TensorShape({num_rows, size}), &out_t)); 421 const auto out = out_t->matrix<T>(); 422 fill(ctx->eigen_device<Device>(), out_t->flat<T>()); 423 const auto indices_mat = indices.matrix<int64_t>(); 424 for (int64_t i = 0; i < indices_mat.dimension(0); ++i) { 425 const int64_t batch = indices_mat(i, 0); 426 const Tidx bin = values_flat(i); 427 OP_REQUIRES( 428 ctx, batch < out.dimension(0), 429 errors::InvalidArgument("Index out of bound. `batch` (", batch, 430 ") must be less than the dimension size (", 431 out.dimension(0), ").")); 432 OP_REQUIRES( 433 ctx, bin < out.dimension(1), 434 errors::InvalidArgument("Index out ouf bound. `bin` (", bin, 435 ") must be less then the dimension size (", 436 out.dimension(1), ").")); 437 if (bin < size) { 438 if (binary_output_) { 439 out(batch, bin) = T(1); 440 } else { 441 if (weights_size) { 442 out(batch, bin) += weights(i); 443 } else { 444 out(batch, bin) += T(1); 445 } 446 } 447 } 448 } 449 } 450 } 451 452 private: 453 bool binary_output_; 454 }; 455 456 #define REGISTER_KERNELS(Tidx, T) \ 457 REGISTER_KERNEL_BUILDER(Name("SparseBincount") \ 458 .Device(DEVICE_CPU) \ 459 .TypeConstraint<T>("T") \ 460 .TypeConstraint<Tidx>("Tidx"), \ 461 SparseBincountOp<CPUDevice, Tidx, T>); 462 #define REGISTER_CPU_KERNELS(T) \ 463 REGISTER_KERNELS(int32, T); \ 464 REGISTER_KERNELS(int64_t, T); 465 466 TF_CALL_NUMBER_TYPES(REGISTER_CPU_KERNELS); 467 #undef REGISTER_CPU_KERNELS 468 #undef REGISTER_KERNELS 469 470 template <typename Device, typename Tidx, typename T> 471 class RaggedBincountOp : public OpKernel { 472 public: RaggedBincountOp(OpKernelConstruction * ctx)473 explicit RaggedBincountOp(OpKernelConstruction* ctx) : OpKernel(ctx) { 474 OP_REQUIRES_OK(ctx, ctx->GetAttr("binary_output", &binary_output_)); 475 } 476 Compute(OpKernelContext * ctx)477 void Compute(OpKernelContext* ctx) override { 478 const auto splits = ctx->input(0).flat<int64_t>(); 479 const auto values = ctx->input(1).flat<Tidx>(); 480 const Tensor& size_t = ctx->input(2); 481 const auto weights = ctx->input(3).flat<T>(); 482 const int64_t weights_size = weights.size(); 483 484 OP_REQUIRES(ctx, size_t.dims() == 0, 485 errors::InvalidArgument("Shape must be rank 0 but is rank ", 486 size_t.dims())); 487 Tidx size = size_t.scalar<Tidx>()(); 488 OP_REQUIRES( 489 ctx, size >= 0, 490 errors::InvalidArgument("size (", size, ") must be non-negative")); 491 492 int num_rows = splits.size() - 1; 493 int num_values = values.size(); 494 int batch_idx = 0; 495 496 OP_REQUIRES(ctx, splits.size() > 0, 497 errors::InvalidArgument("Splits must be non-empty")); 498 499 OP_REQUIRES(ctx, splits(0) == 0, 500 errors::InvalidArgument("Splits must start with 0, not with ", 501 splits(0))); 502 503 OP_REQUIRES(ctx, splits(num_rows) == num_values, 504 errors::InvalidArgument( 505 "Splits must end with the number of values, got ", 506 splits(num_rows), " instead of ", num_values)); 507 508 Tensor* out_t; 509 OP_REQUIRES_OK( 510 ctx, ctx->allocate_output(0, TensorShape({num_rows, size}), &out_t)); 511 functor::SetZeroFunctor<Device, T> fill; 512 fill(ctx->eigen_device<Device>(), out_t->flat<T>()); 513 const auto out = out_t->matrix<T>(); 514 515 for (int idx = 0; idx < num_values; ++idx) { 516 while (idx >= splits(batch_idx)) { 517 batch_idx++; 518 } 519 Tidx bin = values(idx); 520 OP_REQUIRES(ctx, bin >= 0, 521 errors::InvalidArgument("Input must be non-negative")); 522 if (bin < size) { 523 if (binary_output_) { 524 out(batch_idx - 1, bin) = T(1); 525 } else { 526 T value = (weights_size > 0) ? weights(idx) : T(1); 527 out(batch_idx - 1, bin) += value; 528 } 529 } 530 } 531 } 532 533 private: 534 bool binary_output_; 535 }; 536 537 #define REGISTER_KERNELS(Tidx, T) \ 538 REGISTER_KERNEL_BUILDER(Name("RaggedBincount") \ 539 .Device(DEVICE_CPU) \ 540 .TypeConstraint<T>("T") \ 541 .TypeConstraint<Tidx>("Tidx"), \ 542 RaggedBincountOp<CPUDevice, Tidx, T>); 543 #define REGISTER_CPU_KERNELS(T) \ 544 REGISTER_KERNELS(int32, T); \ 545 REGISTER_KERNELS(int64_t, T); 546 547 TF_CALL_NUMBER_TYPES(REGISTER_CPU_KERNELS); 548 #undef REGISTER_CPU_KERNELS 549 #undef REGISTER_KERNELS 550 551 } // end namespace tensorflow 552