1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_ 17 #define TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_ 18 19 #define EIGEN_USE_THREADS 20 21 #include "third_party/eigen3/Eigen/Core" 22 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor" 23 #include "tensorflow/core/framework/tensor.h" 24 #include "tensorflow/core/framework/tensor_types.h" 25 #include "tensorflow/core/platform/types.h" 26 27 namespace tensorflow { 28 29 using CPUDevice = Eigen::ThreadPoolDevice; 30 31 namespace functor { 32 33 // Compute reduction over outer dimensions. 34 // Example: 35 // input: [D1, D2, ... , DN] 36 // -> 37 // output: [Di, ... , DN] where i belongs to set [1,N] 38 template <typename InputT, typename AccumT, typename OutputT, 39 typename BinaryFunctor> 40 struct ReduceOuterDimensions { ReduceOuterDimensionsReduceOuterDimensions41 ReduceOuterDimensions() {} 42 43 template <int num_dims> operatorReduceOuterDimensions44 void operator()(const CPUDevice& device, 45 const Eigen::DSizes<Eigen::Index, num_dims>& input_dims, 46 const Tensor& input, Tensor* output) const { 47 // Compute inner and outer dim after reshaping into 2d tensor. 48 const int num_output_dims = output->dims(); 49 auto output_dims = output->template flat<OutputT>().dimensions(); 50 51 Eigen::Index inner_dim = 1, outer_dim = 1; 52 for (int i = 0; i < num_dims - num_output_dims; ++i) 53 outer_dim *= input_dims[i]; 54 for (int i = num_dims - num_output_dims; i < num_dims; ++i) 55 inner_dim *= input_dims[i]; 56 57 if (1 == outer_dim) { 58 // Nothing to do but passing input to output. 59 output->template flat<OutputT>() = 60 input.template flat<InputT>().template cast<OutputT>().reshape( 61 output_dims); 62 return; 63 } 64 65 // Get device thread num. 66 const Eigen::Index num_threads = device.numThreads(); 67 68 // If the inner dim parallelism is large enough 69 // TODO(ezhulenev): There seems to be no benefits in going this route. Check 70 // if this can be improved, or use better heuristic? 71 if (inner_dim > num_threads * 32) { 72 // Do not create more blocks than there are threads in a pool. 73 const Eigen::Index num_blocks = num_threads; 74 75 // Block size along the outer dimension. 76 const Eigen::Index inner_block_size = Eigen::divup(inner_dim, num_blocks); 77 const InputT* input_data = input.template flat<InputT>().data(); 78 79 // Allocate temporary buffer for partial reductions. 80 Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index> buffer( 81 {inner_dim}); 82 buffer.setZero(); 83 AccumT* buffer_data = buffer.data(); 84 85 using Buffer = Eigen::TensorMap< 86 Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index>, 87 Eigen::Unaligned>; 88 89 using Input = Eigen::TensorMap< 90 Eigen::Tensor<const InputT, 1, Eigen::RowMajor, Eigen::Index>, 91 Eigen::Unaligned>; 92 93 const auto compute = [inner_dim, outer_dim, num_blocks, inner_block_size, 94 input_data, buffer_data]( 95 Eigen::Index start, Eigen::Index limit) -> void { 96 DCHECK(start >= 0 && limit <= num_blocks); 97 Eigen::Index inner_dim_start = start * inner_block_size; 98 Eigen::Index inner_dim_limit = limit * inner_block_size; 99 inner_dim_limit = std::min(inner_dim, inner_dim_limit); 100 Eigen::Index my_job_len = inner_dim_limit - inner_dim_start; 101 102 const InputT* my_job_start = input_data + inner_dim_start; 103 Buffer buf(buffer_data + inner_dim_start, my_job_len); 104 105 for (Eigen::Index i = 0; i < outer_dim; ++i) { 106 auto in = Input(my_job_start + i * inner_dim, my_job_len); 107 auto cast = in.template cast<AccumT>(); 108 buf = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf), 109 const decltype(cast)>(buf, cast); 110 } 111 }; 112 113 // Compute cost of reducing a single block. 114 const Eigen::Index compute_size = outer_dim * inner_block_size; 115 const Eigen::Index compute_input_bytes = compute_size * sizeof(InputT); 116 const Eigen::TensorOpCost cost( 117 compute_input_bytes, 118 0, // We'll be mostly writing to L1, assume store cost is 0 119 compute_size * Eigen::internal::functor_traits<BinaryFunctor>::Cost); 120 121 device.parallelFor(num_blocks, cost, compute); 122 123 // Write final result to the output. 124 output->template flat<OutputT>() = 125 buffer.template cast<OutputT>().reshape(output_dims); 126 } else { 127 // Compute block size along the outer dimension for efficiency. 128 const Eigen::Index parallel_cell_size = inner_dim; 129 const Eigen::Index total_workload = outer_dim * inner_dim; 130 const Eigen::Index max_parallelism = total_workload / parallel_cell_size; 131 132 const Eigen::Index min_block_workload = 2000; 133 const Eigen::Index min_block_size = 134 Eigen::divup(min_block_workload, parallel_cell_size); 135 const Eigen::Index max_num_blocks = std::min( 136 max_parallelism, Eigen::divup(total_workload, min_block_size)); 137 138 // Do not create more blocks than there are threads in a pool. 139 const Eigen::Index num_blocks = std::min(max_num_blocks, num_threads); 140 141 // Block size along the outer dimension. 142 const Eigen::Index outer_block_size = Eigen::divup(outer_dim, num_blocks); 143 144 const InputT* input_data = input.template flat<InputT>().data(); 145 146 // Allocate temporary buffer for partial reductions. 147 Tensor buffer(DataTypeToEnum<AccumT>::v(), {num_blocks, inner_dim}); 148 buffer.template flat<AccumT>().setZero(); 149 AccumT* buffer_data = buffer.template flat<AccumT>().data(); 150 151 using Buffer = Eigen::TensorMap< 152 Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index>, 153 Eigen::Unaligned>; 154 155 using Input = Eigen::TensorMap< 156 Eigen::Tensor<const InputT, 1, Eigen::RowMajor, Eigen::Index>, 157 Eigen::Unaligned>; 158 159 const auto compute = [inner_dim, num_blocks, outer_block_size, 160 buffer_data, input_data, outer_dim]( 161 Eigen::Index start, Eigen::Index limit) -> void { 162 DCHECK(start >= 0 && limit <= num_blocks); 163 Eigen::Index outer_dim_start = start * outer_block_size; 164 Eigen::Index outer_dim_limit = limit * outer_block_size; 165 outer_dim_limit = std::min(outer_dim, outer_dim_limit); 166 167 Buffer buf(buffer_data + start * inner_dim, inner_dim); 168 for (Eigen::Index i = outer_dim_start; i < outer_dim_limit; ++i) { 169 auto in = Input(input_data + i * inner_dim, inner_dim); 170 auto cast = in.template cast<AccumT>(); 171 buf = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf), 172 const decltype(cast)>(buf, cast); 173 } 174 }; 175 176 // Compute cost of reducing a single block. 177 const Eigen::Index compute_size = outer_block_size * inner_dim; 178 const Eigen::Index compute_input_bytes = compute_size * sizeof(InputT); 179 const Eigen::TensorOpCost cost( 180 compute_input_bytes, 181 0, // We'll be mostly writing to L1, assume store cost is 0 182 compute_size * Eigen::internal::functor_traits<BinaryFunctor>::Cost); 183 184 device.parallelFor(num_blocks, cost, compute); 185 186 // Aggregate partial results from temporary buffer into first block. 187 auto buf0 = Buffer(buffer_data, inner_dim); 188 // Just sum the buffer up, as inner dimensions is not large in this case. 189 for (int i = 1; i < num_blocks; ++i) { 190 auto buf = Buffer(buffer_data + i * inner_dim, inner_dim); 191 buf0 = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf0), 192 const decltype(buf)>(buf0, buf); 193 } 194 // Write final result to the output. 195 output->template flat<OutputT>() = 196 buf0.template cast<OutputT>().reshape(output_dims); 197 } 198 } 199 }; 200 201 // Compute reduction to some serial middle dimensions (like a axis). 202 // Example: 203 // input: [D1, D2, ... , DN] 204 // -> 205 // output: [Di, ... , Dj] where i & j belongs to set [1,N]. 206 template <typename InputT, typename AccumT, typename OutputT, 207 typename BinaryFunctor, typename Reducer> 208 struct ReduceMiddleDimensions { ReduceMiddleDimensionsReduceMiddleDimensions209 ReduceMiddleDimensions() {} 210 211 template <int num_dims> operatorReduceMiddleDimensions212 void operator()(const CPUDevice& device, 213 const Eigen::DSizes<Eigen::Index, num_dims>& input_dims, 214 const Tensor& input, Tensor* output, 215 const int axis_begin_dim) const { 216 // Compute dims after reshaping into 3d tensor. 217 const int num_output_dims = output->dims(); 218 auto output_dims = output->template flat<OutputT>().dimensions(); 219 220 Eigen::Index inner_dim = 1, middle_dim = 1, outer_dim = 1; 221 for (int i = 0; i < axis_begin_dim; ++i) outer_dim *= input_dims[i]; 222 for (int i = axis_begin_dim; i < axis_begin_dim + num_output_dims; ++i) 223 middle_dim *= input_dims[i]; 224 for (int i = axis_begin_dim + num_output_dims; i < num_dims; ++i) 225 inner_dim *= input_dims[i]; 226 227 if ((1 == inner_dim * outer_dim)) { 228 // Nothing to do. 229 output->template flat<OutputT>() = 230 input.template flat<InputT>().template cast<OutputT>().reshape( 231 output_dims); 232 return; 233 } 234 235 // Compute block size along the outer dimension for efficiency. 236 const Eigen::Index parallel_cell_size = inner_dim; 237 const Eigen::Index max_parallelism = outer_dim * middle_dim; 238 const Eigen::Index total_workload = max_parallelism * inner_dim; 239 240 const Eigen::Index min_block_workload = 2000; 241 const Eigen::Index min_block_size = 242 Eigen::divup(min_block_workload, parallel_cell_size); 243 const Eigen::Index max_num_blocks = 244 std::min(max_parallelism, Eigen::divup(total_workload, min_block_size)); 245 246 // Do not create more blocks than there are threads in a pool. 247 const Eigen::Index num_threads = device.numThreads(); 248 const Eigen::Index num_blocks = std::min(max_num_blocks, num_threads); 249 250 // Block size along the outer dimension. 251 const Eigen::Index outer_block_size = 252 Eigen::divup(total_workload, num_blocks); 253 254 const InputT* input_data = input.template flat<InputT>().data(); 255 256 // Allocate temporary buffer for partial reductions. 257 Eigen::Tensor<AccumT, 2> buffer(num_blocks, middle_dim); 258 buffer.setZero(); 259 AccumT* buffer_data = buffer.data(); 260 261 using Buffer = Eigen::TensorMap<Eigen::Tensor<AccumT, 1>>; 262 using Input = Eigen::TensorMap<Eigen::Tensor<const InputT, 1>>; 263 264 Eigen::array<Eigen::Index, 1> reduction_axis = {0}; 265 Reducer reducer; 266 const BinaryFunctor binary_op; 267 268 const auto compute = [inner_dim, middle_dim, input_data, buffer_data, 269 total_workload, num_blocks, outer_block_size, 270 reduction_axis, reducer, binary_op]( 271 Eigen::Index start, Eigen::Index limit) -> void { 272 DCHECK(start >= 0 && limit <= num_blocks); 273 Eigen::Index block_start = start * outer_block_size; 274 Eigen::Index block_limit = limit * outer_block_size; 275 block_limit = std::min(total_workload, block_limit); 276 Buffer buf(buffer_data + start * middle_dim, middle_dim); 277 278 const int align_start = 279 ((block_start + inner_dim - 1) / inner_dim) * inner_dim; 280 const int align_end = (block_limit / inner_dim) * inner_dim; 281 282 Eigen::Index coordinate = block_start / inner_dim % middle_dim; 283 Eigen::Tensor<AccumT, 0> reduced = 284 Input(&input_data[block_start], align_start - block_start) 285 .reduce(reduction_axis, reducer) 286 .template cast<AccumT>(); 287 288 buf(coordinate) = binary_op(buf(coordinate), reduced(0)); 289 290 coordinate = align_start / inner_dim % middle_dim; 291 for (int i = align_start; i < align_end; i += inner_dim) { 292 reduced = Input(&input_data[i], inner_dim) 293 .reduce(reduction_axis, reducer) 294 .template cast<AccumT>(); 295 buf(coordinate) = binary_op(buf(coordinate), reduced(0)); 296 ++coordinate; 297 if (middle_dim == coordinate) coordinate = 0; 298 } 299 300 reduced = Input(&input_data[align_end], block_limit - align_end) 301 .reduce(reduction_axis, reducer) 302 .template cast<AccumT>(); 303 buf(coordinate) = binary_op(buf(coordinate), reduced(0)); 304 }; 305 306 // Compute cost of reducing a single block. 307 const Eigen::Index compute_size = outer_block_size * inner_dim; 308 const Eigen::Index compute_input_bytes = compute_size * sizeof(InputT); 309 const Eigen::TensorOpCost cost( 310 compute_input_bytes, 311 0, // We'll be mostly writing to L1, assume store cost is 0 312 compute_size * Eigen::internal::functor_traits<BinaryFunctor>::Cost); 313 314 device.parallelFor(num_blocks, cost, compute); 315 316 using Output = Eigen::TensorMap< 317 Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index>, 318 Eigen::Unaligned>; 319 // Aggregate partial results from temporary buffer into first block. 320 auto buf0 = Output(buffer_data, middle_dim); 321 // TODO(ezhulenev): Parallelize this loop for large inner dimensions? 322 for (int i = 1; i < num_blocks; ++i) { 323 auto buf = Output(buffer_data + i * middle_dim, middle_dim); 324 buf0 = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf0), 325 const decltype(buf)>(buf0, buf); 326 } 327 328 // Write final result to the output. 329 output->template flat<OutputT>() = 330 buf0.template cast<OutputT>().reshape(output_dims); 331 } 332 }; 333 334 } // namespace functor 335 } // namespace tensorflow 336 337 #endif // TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_ 338