xref: /aosp_15_r20/external/tensorflow/tensorflow/core/profiler/convert/op_stats_combiner.cc (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/profiler/convert/op_stats_combiner.h"
17 
18 #include <algorithm>
19 
20 #include "absl/container/flat_hash_map.h"
21 #include "tensorflow/core/profiler/convert/op_metrics_db_combiner.h"
22 #include "tensorflow/core/profiler/convert/xplane_to_tf_functions.h"
23 #include "tensorflow/core/profiler/protobuf/diagnostics.pb.h"
24 #include "tensorflow/core/profiler/protobuf/hardware_types.pb.h"
25 #include "tensorflow/core/profiler/protobuf/kernel_stats.pb.h"
26 #include "tensorflow/core/profiler/protobuf/op_stats.pb.h"
27 #include "tensorflow/core/profiler/protobuf/steps_db.pb.h"
28 #include "tensorflow/core/profiler/utils/hardware_type_utils.h"
29 #include "tensorflow/core/profiler/utils/kernel_stats_utils.h"
30 #include "tensorflow/core/profiler/utils/step_intersection.h"
31 
32 namespace tensorflow {
33 namespace profiler {
34 
35 namespace {
36 
37 // Combines the src PerCoreStepInfo into the dst PerCoreStepInfo.
CombinePerCoreStepInfo(int src_host_id,const PerCoreStepInfo & src,bool use_incomplete_step,PerCoreStepInfo * dst,OpMetricsDbCombiner * hlo_metrics_db_complete_steps_only_combiner,OpMetricsDbCombiner * hlo_metrics_db_per_step_combiner)38 void CombinePerCoreStepInfo(
39     int src_host_id, const PerCoreStepInfo& src, bool use_incomplete_step,
40     PerCoreStepInfo* dst,
41     OpMetricsDbCombiner* hlo_metrics_db_complete_steps_only_combiner,
42     OpMetricsDbCombiner* hlo_metrics_db_per_step_combiner) {
43   CombineCoreIdMap(src_host_id, src.step_info_per_core(),
44                    dst->mutable_step_info_per_core());
45 
46   // Since we have assigned a new step number to the combined result, update
47   // the step number on each core to this new step number.
48   uint32 new_step_num = dst->step_num();
49   for (auto& percore_stepinfo : *dst->mutable_step_info_per_core()) {
50     auto& stepinfo = percore_stepinfo.second;
51     stepinfo.set_step_num(new_step_num);
52   }
53 
54   if (!use_incomplete_step) {
55     hlo_metrics_db_complete_steps_only_combiner->Combine(src.hlo_metrics_db());
56   }
57   hlo_metrics_db_per_step_combiner->Combine(src.hlo_metrics_db());
58   CombineCoreIdMap(src_host_id, src.all_reduce_db_per_core(),
59                    dst->mutable_all_reduce_db_per_core());
60   CombineCoreIdMap(src_host_id, src.core_id_to_replica_id_map(),
61                    dst->mutable_core_id_to_replica_id_map());
62 }
63 
CombineStepDatabase(int src_host_id,const StepIntersection & step_intersection,const StepDatabaseResult & src,StepDatabaseResult * dst,OpMetricsDbCombiner * hlo_metrics_db_complete_steps_only_combiner,std::vector<OpMetricsDbCombiner> * hlo_metrics_db_per_step_combiners)64 void CombineStepDatabase(
65     int src_host_id, const StepIntersection& step_intersection,
66     const StepDatabaseResult& src, StepDatabaseResult* dst,
67     OpMetricsDbCombiner* hlo_metrics_db_complete_steps_only_combiner,
68     std::vector<OpMetricsDbCombiner>* hlo_metrics_db_per_step_combiners) {
69   if (src.use_incomplete_step()) dst->set_use_incomplete_step(true);
70   uint32 src_first_step_idx = step_intersection.FirstStepIndex(src_host_id);
71   for (uint32 i = 0; i < step_intersection.NumSteps(); i++) {
72     CombinePerCoreStepInfo(
73         src_host_id, src.step_sequence(src_first_step_idx + i),
74         src.use_incomplete_step(), dst->mutable_step_sequence(i),
75         hlo_metrics_db_complete_steps_only_combiner,
76         &(*hlo_metrics_db_per_step_combiners)[i]);
77   }
78 }
79 
CombineRunEnvironment(const RunEnvironment & src,RunEnvironment * dst)80 void CombineRunEnvironment(const RunEnvironment& src, RunEnvironment* dst) {
81   dst->mutable_hostnames()->insert(src.hostnames().begin(),
82                                    src.hostnames().end());
83   dst->set_host_count(dst->hostnames_size());
84   if (src.device_type() != "CPU") {
85     dst->set_device_type(src.device_type());
86     dst->set_device_core_count(src.device_core_count() +
87                                dst->device_core_count());
88     // Replica count and num cores per replica must be same for all copies.
89     dst->set_replica_count(std::max(src.replica_count(), dst->replica_count()));
90     dst->set_num_cores_per_replica(
91         std::max(src.num_cores_per_replica(), dst->num_cores_per_replica()));
92     *dst->mutable_topology() = src.topology();
93   } else if (dst->device_type().empty()) {
94     dst->set_device_type(src.device_type());
95   }
96   dst->set_task_count(src.task_count() + dst->task_count());
97   (*dst->mutable_host_independent_job_info()) = src.host_independent_job_info();
98   for (const auto& job_info : src.host_dependent_job_info()) {
99     *(dst->add_host_dependent_job_info()) = job_info;
100   }
101   dst->set_host_trace_level(src.host_trace_level());
102 }
103 
104 // Combines the src PerfEnv into the dst PerfEnv.
CombinePerfEnv(const PerfEnv & src,PerfEnv * dst)105 void CombinePerfEnv(const PerfEnv& src, PerfEnv* dst) {
106   dst->set_peak_tera_flops_per_second(src.peak_tera_flops_per_second());
107   dst->set_peak_hbm_bw_giga_bytes_per_second(
108       src.peak_hbm_bw_giga_bytes_per_second());
109   dst->set_ridge_point(src.ridge_point());
110 }
111 
112 // Combines the src Diagnostics into the dst Diagnostics.
CombineDiagnostics(const Diagnostics & src,Diagnostics * dst)113 void CombineDiagnostics(const Diagnostics& src, Diagnostics* dst) {
114   dst->mutable_info()->MergeFrom(src.info());
115   dst->mutable_warnings()->MergeFrom(src.warnings());
116   dst->mutable_errors()->MergeFrom(src.errors());
117 }
118 
119 // Combine the src OpStats into the dst OpStats.
CombineOpStats(bool no_accelerator_in_system,int src_host_id,HardwareType hardware_type,const StepIntersection & step_intersection,const OpStats & src,OpStats * dst,OpMetricsDbCombiner * host_op_metrics_db_combiner,OpMetricsDbCombiner * device_op_metrics_db_combiner,OpMetricsDbCombiner * hlo_metrics_db_complete_steps_only_combiner,std::vector<OpMetricsDbCombiner> * hlo_metrics_db_per_step_combiners)120 void CombineOpStats(
121     bool no_accelerator_in_system, int src_host_id, HardwareType hardware_type,
122     const StepIntersection& step_intersection, const OpStats& src, OpStats* dst,
123     OpMetricsDbCombiner* host_op_metrics_db_combiner,
124     OpMetricsDbCombiner* device_op_metrics_db_combiner,
125     OpMetricsDbCombiner* hlo_metrics_db_complete_steps_only_combiner,
126     std::vector<OpMetricsDbCombiner>* hlo_metrics_db_per_step_combiners) {
127   // Combine host_metrics_db.
128   // Host OpMetricsDb does not need to update the number of cores a certain op
129   // occurs.
130   host_op_metrics_db_combiner->Combine(src.host_op_metrics_db(),
131                                        /*update_num_cores=*/false);
132   // Combine device_metrics_db.
133   device_op_metrics_db_combiner->Combine(src.device_op_metrics_db());
134 
135   // Combine step_db.
136   if (!IsCoordinator(no_accelerator_in_system, hardware_type)) {
137     CombineStepDatabase(src_host_id, step_intersection, src.step_db(),
138                         dst->mutable_step_db(),
139                         hlo_metrics_db_complete_steps_only_combiner,
140                         hlo_metrics_db_per_step_combiners);
141   }
142 
143   // Combine run environment info.
144   CombineRunEnvironment(src.run_environment(), dst->mutable_run_environment());
145 
146   // Combine the perf environment info.
147   CombinePerfEnv(src.perf_env(), dst->mutable_perf_env());
148 
149   // Combine diagnostics.
150   CombineDiagnostics(src.diagnostics(), dst->mutable_diagnostics());
151 
152   // Combine kernel stats.
153   dst->mutable_kernel_stats_db()->mutable_reports()->MergeFrom(
154       src.kernel_stats_db().reports());
155 
156   // Combine tf-function stats.
157   CombineTfFunctionDb(src.tf_function_db(), dst->mutable_tf_function_db());
158 
159   // Combine the mapping from core ID to details.
160   CombineCoreIdMap(src_host_id, src.core_id_to_details(),
161                    dst->mutable_core_id_to_details());
162 
163   // Combine performance counter result.
164   dst->mutable_performance_counter_result()
165       ->set_matrix_unit_utilization_percent(
166           dst->performance_counter_result().matrix_unit_utilization_percent() +
167           src.performance_counter_result().matrix_unit_utilization_percent());
168 }
169 
170 }  // namespace
171 
IsCoordinator(bool no_accelerator_in_system,HardwareType hardware_type)172 bool IsCoordinator(bool no_accelerator_in_system, HardwareType hardware_type) {
173   // A host is a coordinator if:
174   //   (1) The host doesn't have a device, and
175   //   (2) The system does use accelerator (if not, it uses CPU only and so this
176   //   host should be regarded as a worker as well).
177   return !HasDevice(hardware_type) && !no_accelerator_in_system;
178 }
179 
NoAcceleratorInSystem(const std::vector<OpStatsInfo> & all_op_stats_info)180 bool NoAcceleratorInSystem(const std::vector<OpStatsInfo>& all_op_stats_info) {
181   for (const auto& op_stats_info : all_op_stats_info) {
182     if (HasDevice(op_stats_info.hardware_type)) {
183       return false;
184     }
185   }
186   return true;
187 }
188 
GlobalCoreId(int host_id,uint32 device_ordinal)189 uint32 GlobalCoreId(int host_id, uint32 device_ordinal) {
190   constexpr uint32 kMaxDevicesPerHost = 1000;  // power-of-10 for debuggability
191   return host_id * kMaxDevicesPerHost + device_ordinal;
192 }
193 
ComputeStepIntersectionToMergeOpStats(const std::vector<OpStatsInfo> & all_op_stats_info,uint32 max_step_per_host)194 StepIntersection ComputeStepIntersectionToMergeOpStats(
195     const std::vector<OpStatsInfo>& all_op_stats_info,
196     uint32 max_step_per_host) {
197   bool no_accelerator_in_system = NoAcceleratorInSystem(all_op_stats_info);
198 
199   absl::flat_hash_map<uint32, const StepDatabaseResult*> per_host_step_db;
200   for (const auto& op_stats_info : all_op_stats_info) {
201     if (IsCoordinator(no_accelerator_in_system, op_stats_info.hardware_type))
202       continue;
203     // Includes only workers in per_host_step_db.
204     per_host_step_db[op_stats_info.src_host_id] =
205         &op_stats_info.op_stats->step_db();
206   }
207 
208   return StepIntersection(max_step_per_host, per_host_step_db);
209 }
210 
CombineAllOpStats(const std::vector<OpStatsInfo> & all_op_stats_info,const StepIntersection & step_intersection,OpStats * combined_op_stats)211 void CombineAllOpStats(const std::vector<OpStatsInfo>& all_op_stats_info,
212                        const StepIntersection& step_intersection,
213                        OpStats* combined_op_stats) {
214   StepDatabaseResult* combined_step_db = combined_op_stats->mutable_step_db();
215   // Initialize the StepDatabaseResult field that depends on the number of
216   // steps.
217   for (uint32 dst_step_num : step_intersection.DstStepNumbers()) {
218     combined_step_db->add_step_sequence()->set_step_num(dst_step_num);
219   }
220   // Record the number of steps that are dropped.
221   combined_step_db->set_num_steps_dropped(step_intersection.StepsDropped());
222 
223   combined_step_db->set_empty_intersect(step_intersection.EmptyIntersect());
224 
225   // Initialize all the OpMetricsDbCombiners.
226   OpMetricsDbCombiner host_op_metrics_db_combiner(
227       combined_op_stats->mutable_host_op_metrics_db());
228   OpMetricsDbCombiner device_op_metrics_db_combiner(
229       combined_op_stats->mutable_device_op_metrics_db());
230   OpMetricsDbCombiner hlo_metrics_db_complete_steps_only_combiner(
231       combined_op_stats->mutable_hlo_metrics_db_complete_steps_only());
232   std::vector<OpMetricsDbCombiner> hlo_metrics_db_per_step_combiners;
233   hlo_metrics_db_per_step_combiners.reserve(
234       combined_step_db->step_sequence_size());
235   for (PerCoreStepInfo& step_info :
236        *combined_step_db->mutable_step_sequence()) {
237     hlo_metrics_db_per_step_combiners.emplace_back(
238         step_info.mutable_hlo_metrics_db());
239   }
240 
241   bool no_accelerator_in_system = NoAcceleratorInSystem(all_op_stats_info);
242 
243   for (const auto& op_stats_info : all_op_stats_info) {
244     CombineOpStats(no_accelerator_in_system, op_stats_info.src_host_id,
245                    op_stats_info.hardware_type, step_intersection,
246                    *op_stats_info.op_stats, combined_op_stats,
247                    &host_op_metrics_db_combiner, &device_op_metrics_db_combiner,
248                    &hlo_metrics_db_complete_steps_only_combiner,
249                    &hlo_metrics_db_per_step_combiners);
250   }
251 
252   // Sorts all the kernel reports that have been merged by CombineTfOpStats and
253   // keeps only the top kernel reports with long kernel duration.
254   SortAndKeepTopKDurationKernelReportsInDb(
255       combined_op_stats->mutable_kernel_stats_db());
256 
257   // Process performance counter results.
258   combined_op_stats->mutable_performance_counter_result()
259       ->set_matrix_unit_utilization_percent(
260           combined_op_stats->performance_counter_result()
261               .matrix_unit_utilization_percent() /
262           all_op_stats_info.size());
263 }
264 
265 }  // namespace profiler
266 }  // namespace tensorflow
267