xref: /aosp_15_r20/external/tensorflow/tensorflow/core/util/debug_events_writer_test.cc (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/util/debug_events_writer.h"
17 
18 #include <vector>
19 
20 #include "absl/container/flat_hash_set.h"
21 #include "tensorflow/core/lib/core/status_test_util.h"
22 #include "tensorflow/core/lib/core/threadpool.h"
23 #include "tensorflow/core/lib/io/path.h"
24 #include "tensorflow/core/lib/io/record_reader.h"
25 #include "tensorflow/core/lib/strings/stringprintf.h"
26 #include "tensorflow/core/platform/test.h"
27 #include "tensorflow/core/protobuf/graph_debug_info.pb.h"
28 
29 namespace tensorflow {
30 namespace tfdbg {
31 
32 // shorthand
env()33 Env* env() { return Env::Default(); }
34 
35 class DebugEventsWriterTest : public ::testing::Test {
36  public:
GetDebugEventFileName(DebugEventsWriter * writer,DebugEventFileType type)37   static string GetDebugEventFileName(DebugEventsWriter* writer,
38                                       DebugEventFileType type) {
39     return writer->FileName(type);
40   }
41 
ReadDebugEventProtos(DebugEventsWriter * writer,DebugEventFileType type,std::vector<DebugEvent> * protos)42   static void ReadDebugEventProtos(DebugEventsWriter* writer,
43                                    DebugEventFileType type,
44                                    std::vector<DebugEvent>* protos) {
45     protos->clear();
46     const string filename = writer->FileName(type);
47     std::unique_ptr<RandomAccessFile> debug_events_file;
48     TF_CHECK_OK(env()->NewRandomAccessFile(filename, &debug_events_file));
49     io::RecordReader* reader = new io::RecordReader(debug_events_file.get());
50 
51     uint64 offset = 0;
52     DebugEvent actual;
53     while (ReadDebugEventProto(reader, &offset, &actual)) {
54       protos->push_back(actual);
55     }
56 
57     delete reader;
58   }
59 
ReadDebugEventProto(io::RecordReader * reader,uint64 * offset,DebugEvent * proto)60   static bool ReadDebugEventProto(io::RecordReader* reader, uint64* offset,
61                                   DebugEvent* proto) {
62     tstring record;
63     Status s = reader->ReadRecord(offset, &record);
64     if (!s.ok()) {
65       return false;
66     }
67     return ParseProtoUnlimited(proto, record);
68   }
69 
SetUp()70   void SetUp() override {
71     dump_root_ = io::JoinPath(
72         testing::TmpDir(),
73         strings::Printf("%010lld", static_cast<long long>(env()->NowMicros())));
74     tfdbg_run_id_ = "test_tfdbg_run_id";
75   }
76 
TearDown()77   void TearDown() override {
78     if (env()->IsDirectory(dump_root_).ok()) {
79       int64_t undeleted_files = 0;
80       int64_t undeleted_dirs = 0;
81       TF_ASSERT_OK(env()->DeleteRecursively(dump_root_, &undeleted_files,
82                                             &undeleted_dirs));
83       ASSERT_EQ(0, undeleted_files);
84       ASSERT_EQ(0, undeleted_dirs);
85     }
86   }
87 
88   string dump_root_;
89   string tfdbg_run_id_;
90 };
91 
TEST_F(DebugEventsWriterTest,GetDebugEventsWriterSameRootGivesSameObject)92 TEST_F(DebugEventsWriterTest, GetDebugEventsWriterSameRootGivesSameObject) {
93   // Test the per-dump_root_ singleton pattern.
94   DebugEventsWriter* writer_1 = DebugEventsWriter::GetDebugEventsWriter(
95       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
96   DebugEventsWriter* writer_2 = DebugEventsWriter::GetDebugEventsWriter(
97       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
98   EXPECT_EQ(writer_1, writer_2);
99 }
100 
TEST_F(DebugEventsWriterTest,ConcurrentGetDebugEventsWriterSameDumpRoot)101 TEST_F(DebugEventsWriterTest, ConcurrentGetDebugEventsWriterSameDumpRoot) {
102   thread::ThreadPool* thread_pool =
103       new thread::ThreadPool(Env::Default(), "test_pool", 4);
104 
105   std::vector<DebugEventsWriter*> writers;
106   mutex mu;
107   auto fn = [this, &writers, &mu]() {
108     DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
109         dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
110     {
111       mutex_lock l(mu);
112       writers.push_back(writer);
113     }
114   };
115   for (size_t i = 0; i < 4; ++i) {
116     thread_pool->Schedule(fn);
117   }
118   delete thread_pool;
119 
120   EXPECT_EQ(writers.size(), 4);
121   EXPECT_EQ(writers[0], writers[1]);
122   EXPECT_EQ(writers[1], writers[2]);
123   EXPECT_EQ(writers[2], writers[3]);
124 }
125 
TEST_F(DebugEventsWriterTest,ConcurrentGetDebugEventsWriterDiffDumpRoots)126 TEST_F(DebugEventsWriterTest, ConcurrentGetDebugEventsWriterDiffDumpRoots) {
127   thread::ThreadPool* thread_pool =
128       new thread::ThreadPool(Env::Default(), "test_pool", 3);
129 
130   std::atomic_int_fast64_t counter(0);
131   std::vector<DebugEventsWriter*> writers;
132   mutex mu;
133   auto fn = [this, &counter, &writers, &mu]() {
134     const string new_dump_root =
135         io::JoinPath(dump_root_, strings::Printf("%ld", counter.fetch_add(1)));
136     DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
137         new_dump_root, tfdbg_run_id_,
138         DebugEventsWriter::kDefaultCyclicBufferSize);
139     {
140       mutex_lock l(mu);
141       writers.push_back(writer);
142     }
143   };
144   for (size_t i = 0; i < 3; ++i) {
145     thread_pool->Schedule(fn);
146   }
147   delete thread_pool;
148 
149   EXPECT_EQ(writers.size(), 3);
150   EXPECT_NE(writers[0], writers[1]);
151   EXPECT_NE(writers[0], writers[2]);
152   EXPECT_NE(writers[1], writers[2]);
153 }
154 
TEST_F(DebugEventsWriterTest,GetDebugEventsWriterDifferentRoots)155 TEST_F(DebugEventsWriterTest, GetDebugEventsWriterDifferentRoots) {
156   // Test the DebugEventsWriters for different directories are different.
157   DebugEventsWriter* writer_1 = DebugEventsWriter::GetDebugEventsWriter(
158       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
159   const string dump_root_2 = io::JoinPath(dump_root_, "subdirectory");
160   DebugEventsWriter* writer_2 = DebugEventsWriter::GetDebugEventsWriter(
161       dump_root_2, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
162   EXPECT_NE(writer_1, writer_2);
163 }
164 
TEST_F(DebugEventsWriterTest,GetAndInitDebugEventsWriter)165 TEST_F(DebugEventsWriterTest, GetAndInitDebugEventsWriter) {
166   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
167       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
168   TF_ASSERT_OK(writer->Init());
169   TF_ASSERT_OK(writer->Close());
170 
171   // Verify the metadata file's content.
172   std::vector<DebugEvent> actuals;
173   ReadDebugEventProtos(writer, DebugEventFileType::METADATA, &actuals);
174   EXPECT_EQ(actuals.size(), 1);
175   EXPECT_GT(actuals[0].debug_metadata().tensorflow_version().length(), 0);
176   // Check the content of the file version string.
177   const string file_version = actuals[0].debug_metadata().file_version();
178   EXPECT_EQ(file_version.find(DebugEventsWriter::kVersionPrefix), 0);
179   EXPECT_GT(file_version.size(), strlen(DebugEventsWriter::kVersionPrefix));
180   // Check the tfdbg run ID.
181   EXPECT_EQ(actuals[0].debug_metadata().tfdbg_run_id(), "test_tfdbg_run_id");
182 
183   // Verify that the .source_files file has been created and is empty.
184   ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
185   // Verify that the .stack_frames file has been created and is empty.
186   ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
187 }
188 
TEST_F(DebugEventsWriterTest,CallingCloseWithoutInitIsOkay)189 TEST_F(DebugEventsWriterTest, CallingCloseWithoutInitIsOkay) {
190   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
191       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
192   TF_ASSERT_OK(writer->Close());
193 }
194 
TEST_F(DebugEventsWriterTest,CallingCloseTwiceIsOkay)195 TEST_F(DebugEventsWriterTest, CallingCloseTwiceIsOkay) {
196   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
197       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
198   TF_ASSERT_OK(writer->Close());
199   TF_ASSERT_OK(writer->Close());
200 }
201 
TEST_F(DebugEventsWriterTest,ConcurrentInitCalls)202 TEST_F(DebugEventsWriterTest, ConcurrentInitCalls) {
203   // Test that concurrent calls to Init() works correctly.
204   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
205       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
206 
207   thread::ThreadPool* thread_pool =
208       new thread::ThreadPool(Env::Default(), "test_pool", 4);
209   auto fn = [&writer]() { TF_ASSERT_OK(writer->Init()); };
210   for (size_t i = 0; i < 3; ++i) {
211     thread_pool->Schedule(fn);
212   }
213   delete thread_pool;
214 
215   TF_ASSERT_OK(writer->Close());
216 
217   // Verify the metadata file's content.
218   std::vector<DebugEvent> actuals;
219   ReadDebugEventProtos(writer, DebugEventFileType::METADATA, &actuals);
220   EXPECT_EQ(actuals.size(), 1);
221   EXPECT_GT(actuals[0].debug_metadata().tensorflow_version().length(), 0);
222   // Check the content of the file version string.
223   const string file_version = actuals[0].debug_metadata().file_version();
224   EXPECT_EQ(file_version.find(DebugEventsWriter::kVersionPrefix), 0);
225   EXPECT_GT(file_version.size(), strlen(DebugEventsWriter::kVersionPrefix));
226   EXPECT_EQ(actuals[0].debug_metadata().tfdbg_run_id(), "test_tfdbg_run_id");
227 
228   // Verify that the .source_files file has been created and is empty.
229   ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
230   // Verify that the .stack_frames file has been created and is empty.
231   ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
232 }
233 
TEST_F(DebugEventsWriterTest,InitTwiceDoesNotCreateNewMetadataFile)234 TEST_F(DebugEventsWriterTest, InitTwiceDoesNotCreateNewMetadataFile) {
235   // Test that Init() is idempotent.
236   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
237       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
238   TF_ASSERT_OK(writer->Init());
239 
240   std::vector<DebugEvent> actuals;
241   ReadDebugEventProtos(writer, DebugEventFileType::METADATA, &actuals);
242   EXPECT_EQ(actuals.size(), 1);
243   EXPECT_GT(actuals[0].debug_metadata().tensorflow_version().length(), 0);
244   EXPECT_EQ(actuals[0].debug_metadata().tfdbg_run_id(), "test_tfdbg_run_id");
245   EXPECT_GE(actuals[0].debug_metadata().file_version().size(), 0);
246 
247   string metadata_path_1 =
248       GetDebugEventFileName(writer, DebugEventFileType::METADATA);
249   TF_ASSERT_OK(writer->Init());
250   EXPECT_EQ(GetDebugEventFileName(writer, DebugEventFileType::METADATA),
251             metadata_path_1);
252   TF_ASSERT_OK(writer->Close());
253 
254   // Verify the metadata file's content.
255   ReadDebugEventProtos(writer, DebugEventFileType::METADATA, &actuals);
256   EXPECT_EQ(actuals.size(), 1);
257   EXPECT_GT(actuals[0].debug_metadata().tensorflow_version().length(), 0);
258   EXPECT_EQ(actuals[0].debug_metadata().tfdbg_run_id(), "test_tfdbg_run_id");
259   EXPECT_GE(actuals[0].debug_metadata().file_version().size(), 0);
260 }
261 
TEST_F(DebugEventsWriterTest,WriteSourceFile)262 TEST_F(DebugEventsWriterTest, WriteSourceFile) {
263   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
264       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
265   TF_ASSERT_OK(writer->Init());
266 
267   SourceFile* source_file_1 = new SourceFile();
268   source_file_1->set_file_path("/home/tf_programs/main.py");
269   source_file_1->set_host_name("localhost.localdomain");
270   source_file_1->add_lines("import tensorflow as tf");
271   source_file_1->add_lines("");
272   source_file_1->add_lines("print(tf.constant([42.0]))");
273   source_file_1->add_lines("");
274   TF_ASSERT_OK(writer->WriteSourceFile(source_file_1));
275 
276   SourceFile* source_file_2 = new SourceFile();
277   source_file_2->set_file_path("/home/tf_programs/train.py");
278   source_file_2->set_host_name("localhost.localdomain");
279   source_file_2->add_lines("import tensorflow.keras as keras");
280   source_file_2->add_lines("");
281   source_file_2->add_lines("model = keras.Sequential()");
282   TF_ASSERT_OK(writer->WriteSourceFile(source_file_2));
283 
284   TF_ASSERT_OK(writer->FlushNonExecutionFiles());
285   TF_ASSERT_OK(writer->Close());
286 
287   std::vector<DebugEvent> actuals;
288   ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
289   EXPECT_EQ(actuals.size(), 2);
290   EXPECT_GT(actuals[0].wall_time(), 0);
291   EXPECT_GT(actuals[1].wall_time(), actuals[0].wall_time());
292 
293   SourceFile actual_source_file_1 = actuals[0].source_file();
294   EXPECT_EQ(actual_source_file_1.file_path(), "/home/tf_programs/main.py");
295   EXPECT_EQ(actual_source_file_1.host_name(), "localhost.localdomain");
296   EXPECT_EQ(actual_source_file_1.lines().size(), 4);
297   EXPECT_EQ(actual_source_file_1.lines()[0], "import tensorflow as tf");
298   EXPECT_EQ(actual_source_file_1.lines()[1], "");
299   EXPECT_EQ(actual_source_file_1.lines()[2], "print(tf.constant([42.0]))");
300   EXPECT_EQ(actual_source_file_1.lines()[3], "");
301 
302   SourceFile actual_source_file_2 = actuals[1].source_file();
303   EXPECT_EQ(actual_source_file_2.file_path(), "/home/tf_programs/train.py");
304   EXPECT_EQ(actual_source_file_2.host_name(), "localhost.localdomain");
305   EXPECT_EQ(actual_source_file_2.lines().size(), 3);
306   EXPECT_EQ(actual_source_file_2.lines()[0],
307             "import tensorflow.keras as keras");
308   EXPECT_EQ(actual_source_file_2.lines()[1], "");
309   EXPECT_EQ(actual_source_file_2.lines()[2], "model = keras.Sequential()");
310 
311   // Verify no cross talk in the other non-execution debug-event files.
312   ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
313   EXPECT_EQ(actuals.size(), 0);
314   ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
315   EXPECT_EQ(actuals.size(), 0);
316   ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
317   EXPECT_EQ(actuals.size(), 0);
318   ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
319                        &actuals);
320   EXPECT_EQ(actuals.size(), 0);
321 }
322 
TEST_F(DebugEventsWriterTest,WriteStackFramesFile)323 TEST_F(DebugEventsWriterTest, WriteStackFramesFile) {
324   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
325       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
326   TF_ASSERT_OK(writer->Init());
327 
328   StackFrameWithId* stack_frame_1 = new StackFrameWithId();
329   stack_frame_1->set_id("deadbeaf");
330   GraphDebugInfo::FileLineCol* file_line_col =
331       stack_frame_1->mutable_file_line_col();
332   file_line_col->set_file_index(12);
333   file_line_col->set_line(20);
334   file_line_col->set_col(2);
335   file_line_col->set_func("my_func");
336   file_line_col->set_code("  x = y + z");
337 
338   StackFrameWithId* stack_frame_2 = new StackFrameWithId();
339   stack_frame_2->set_id("eeeeeeec");
340   file_line_col = stack_frame_2->mutable_file_line_col();
341   file_line_col->set_file_index(12);
342   file_line_col->set_line(21);
343   file_line_col->set_col(4);
344   file_line_col->set_func("my_func");
345   file_line_col->set_code("  x = x ** 2.0");
346 
347   TF_ASSERT_OK(writer->WriteStackFrameWithId(stack_frame_1));
348   TF_ASSERT_OK(writer->WriteStackFrameWithId(stack_frame_2));
349   TF_ASSERT_OK(writer->FlushNonExecutionFiles());
350   TF_ASSERT_OK(writer->Close());
351 
352   std::vector<DebugEvent> actuals;
353   ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
354   EXPECT_EQ(actuals.size(), 2);
355   EXPECT_GT(actuals[0].wall_time(), 0);
356   EXPECT_GT(actuals[1].wall_time(), actuals[0].wall_time());
357 
358   StackFrameWithId actual_stack_frame_1 = actuals[0].stack_frame_with_id();
359   EXPECT_EQ(actual_stack_frame_1.id(), "deadbeaf");
360   GraphDebugInfo::FileLineCol file_line_col_1 =
361       actual_stack_frame_1.file_line_col();
362   EXPECT_EQ(file_line_col_1.file_index(), 12);
363   EXPECT_EQ(file_line_col_1.line(), 20);
364   EXPECT_EQ(file_line_col_1.col(), 2);
365   EXPECT_EQ(file_line_col_1.func(), "my_func");
366   EXPECT_EQ(file_line_col_1.code(), "  x = y + z");
367 
368   StackFrameWithId actual_stack_frame_2 = actuals[1].stack_frame_with_id();
369   EXPECT_EQ(actual_stack_frame_2.id(), "eeeeeeec");
370   GraphDebugInfo::FileLineCol file_line_col_2 =
371       actual_stack_frame_2.file_line_col();
372   EXPECT_EQ(file_line_col_2.file_index(), 12);
373   EXPECT_EQ(file_line_col_2.line(), 21);
374   EXPECT_EQ(file_line_col_2.col(), 4);
375   EXPECT_EQ(file_line_col_2.func(), "my_func");
376   EXPECT_EQ(file_line_col_2.code(), "  x = x ** 2.0");
377 
378   // Verify no cross talk in the other non-execution debug-event files.
379   ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
380   EXPECT_EQ(actuals.size(), 0);
381   ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
382   EXPECT_EQ(actuals.size(), 0);
383 }
384 
TEST_F(DebugEventsWriterTest,WriteGraphOpCreationAndDebuggedGraph)385 TEST_F(DebugEventsWriterTest, WriteGraphOpCreationAndDebuggedGraph) {
386   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
387       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
388   TF_ASSERT_OK(writer->Init());
389 
390   GraphOpCreation* graph_op_creation = new GraphOpCreation();
391   graph_op_creation->set_op_type("MatMul");
392   graph_op_creation->set_op_name("Dense_1/MatMul");
393   TF_ASSERT_OK(writer->WriteGraphOpCreation(graph_op_creation));
394 
395   DebuggedGraph* debugged_graph = new DebuggedGraph();
396   debugged_graph->set_graph_id("deadbeaf");
397   debugged_graph->set_graph_name("my_func_graph");
398   TF_ASSERT_OK(writer->WriteDebuggedGraph(debugged_graph));
399 
400   TF_ASSERT_OK(writer->FlushNonExecutionFiles());
401   TF_ASSERT_OK(writer->Close());
402 
403   std::vector<DebugEvent> actuals;
404   ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
405   EXPECT_EQ(actuals.size(), 2);
406   EXPECT_GT(actuals[0].wall_time(), 0);
407   EXPECT_GT(actuals[1].wall_time(), actuals[0].wall_time());
408 
409   GraphOpCreation actual_op_creation = actuals[0].graph_op_creation();
410   EXPECT_EQ(actual_op_creation.op_type(), "MatMul");
411   EXPECT_EQ(actual_op_creation.op_name(), "Dense_1/MatMul");
412 
413   DebuggedGraph actual_debugged_graph = actuals[1].debugged_graph();
414   EXPECT_EQ(actual_debugged_graph.graph_id(), "deadbeaf");
415   EXPECT_EQ(actual_debugged_graph.graph_name(), "my_func_graph");
416 
417   // Verify no cross talk in the other non-execution debug-event files.
418   ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
419   EXPECT_EQ(actuals.size(), 0);
420   ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
421   EXPECT_EQ(actuals.size(), 0);
422 }
423 
TEST_F(DebugEventsWriterTest,ConcurrentWriteCallsToTheSameFile)424 TEST_F(DebugEventsWriterTest, ConcurrentWriteCallsToTheSameFile) {
425   const size_t kConcurrentWrites = 100;
426   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
427       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
428   TF_ASSERT_OK(writer->Init());
429 
430   thread::ThreadPool* thread_pool =
431       new thread::ThreadPool(Env::Default(), "test_pool", 8);
432   std::atomic_int_fast64_t counter(0);
433   auto fn = [&writer, &counter]() {
434     const string file_path = strings::Printf(
435         "/home/tf_programs/program_%.3ld.py", counter.fetch_add(1));
436     SourceFile* source_file = new SourceFile();
437     source_file->set_file_path(file_path);
438     source_file->set_host_name("localhost.localdomain");
439     TF_ASSERT_OK(writer->WriteSourceFile(source_file));
440   };
441   for (size_t i = 0; i < kConcurrentWrites; ++i) {
442     thread_pool->Schedule(fn);
443   }
444   delete thread_pool;
445 
446   TF_ASSERT_OK(writer->Close());
447 
448   std::vector<DebugEvent> actuals;
449   ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
450   EXPECT_EQ(actuals.size(), kConcurrentWrites);
451   std::vector<string> file_paths;
452   std::vector<string> host_names;
453   for (size_t i = 0; i < kConcurrentWrites; ++i) {
454     file_paths.push_back(actuals[i].source_file().file_path());
455     host_names.push_back(actuals[i].source_file().host_name());
456   }
457   std::sort(file_paths.begin(), file_paths.end());
458   for (size_t i = 0; i < kConcurrentWrites; ++i) {
459     EXPECT_EQ(file_paths[i],
460               strings::Printf("/home/tf_programs/program_%.3ld.py", i));
461     EXPECT_EQ(host_names[i], "localhost.localdomain");
462   }
463 }
464 
TEST_F(DebugEventsWriterTest,ConcurrentWriteAndFlushCallsToTheSameFile)465 TEST_F(DebugEventsWriterTest, ConcurrentWriteAndFlushCallsToTheSameFile) {
466   const size_t kConcurrentWrites = 100;
467   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
468       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
469   TF_ASSERT_OK(writer->Init());
470 
471   thread::ThreadPool* thread_pool =
472       new thread::ThreadPool(Env::Default(), "test_pool", 8);
473   std::atomic_int_fast64_t counter(0);
474   auto fn = [&writer, &counter]() {
475     const string file_path = strings::Printf(
476         "/home/tf_programs/program_%.3ld.py", counter.fetch_add(1));
477     SourceFile* source_file = new SourceFile();
478     source_file->set_file_path(file_path);
479     source_file->set_host_name("localhost.localdomain");
480     TF_ASSERT_OK(writer->WriteSourceFile(source_file));
481     TF_ASSERT_OK(writer->FlushNonExecutionFiles());
482   };
483   for (size_t i = 0; i < kConcurrentWrites; ++i) {
484     thread_pool->Schedule(fn);
485   }
486   delete thread_pool;
487 
488   TF_ASSERT_OK(writer->Close());
489 
490   std::vector<DebugEvent> actuals;
491   ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
492   EXPECT_EQ(actuals.size(), kConcurrentWrites);
493   std::vector<string> file_paths;
494   std::vector<string> host_names;
495   for (size_t i = 0; i < kConcurrentWrites; ++i) {
496     file_paths.push_back(actuals[i].source_file().file_path());
497     host_names.push_back(actuals[i].source_file().host_name());
498   }
499   std::sort(file_paths.begin(), file_paths.end());
500   for (size_t i = 0; i < kConcurrentWrites; ++i) {
501     EXPECT_EQ(file_paths[i],
502               strings::Printf("/home/tf_programs/program_%.3ld.py", i));
503     EXPECT_EQ(host_names[i], "localhost.localdomain");
504   }
505 }
506 
TEST_F(DebugEventsWriterTest,ConcurrentWriteCallsToTheDifferentFiles)507 TEST_F(DebugEventsWriterTest, ConcurrentWriteCallsToTheDifferentFiles) {
508   const int32_t kConcurrentWrites = 30;
509   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
510       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
511   TF_ASSERT_OK(writer->Init());
512 
513   thread::ThreadPool* thread_pool =
514       new thread::ThreadPool(Env::Default(), "test_pool", 10);
515   std::atomic_int_fast32_t counter(0);
516   auto fn = [&writer, &counter]() {
517     const int32_t index = counter.fetch_add(1);
518     if (index % 3 == 0) {
519       SourceFile* source_file = new SourceFile();
520       source_file->set_file_path(
521           strings::Printf("/home/tf_programs/program_%.2d.py", index));
522       source_file->set_host_name("localhost.localdomain");
523       TF_ASSERT_OK(writer->WriteSourceFile(source_file));
524     } else if (index % 3 == 1) {
525       StackFrameWithId* stack_frame = new StackFrameWithId();
526       stack_frame->set_id(strings::Printf("e%.2d", index));
527       TF_ASSERT_OK(writer->WriteStackFrameWithId(stack_frame));
528     } else {
529       GraphOpCreation* op_creation = new GraphOpCreation();
530       op_creation->set_op_type("Log");
531       op_creation->set_op_name(strings::Printf("Log_%.2d", index));
532       TF_ASSERT_OK(writer->WriteGraphOpCreation(op_creation));
533     }
534   };
535   for (size_t i = 0; i < kConcurrentWrites; ++i) {
536     thread_pool->Schedule(fn);
537   }
538   delete thread_pool;
539 
540   TF_ASSERT_OK(writer->Close());
541 
542   std::vector<DebugEvent> actuals;
543   ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
544   EXPECT_EQ(actuals.size(), kConcurrentWrites / 3);
545   std::vector<string> file_paths;
546   std::vector<string> host_names;
547   for (int32_t i = 0; i < kConcurrentWrites / 3; ++i) {
548     file_paths.push_back(actuals[i].source_file().file_path());
549     host_names.push_back(actuals[i].source_file().host_name());
550   }
551   std::sort(file_paths.begin(), file_paths.end());
552   for (int32_t i = 0; i < kConcurrentWrites / 3; ++i) {
553     EXPECT_EQ(file_paths[i],
554               strings::Printf("/home/tf_programs/program_%.2d.py", i * 3));
555     EXPECT_EQ(host_names[i], "localhost.localdomain");
556   }
557 
558   ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
559   EXPECT_EQ(actuals.size(), kConcurrentWrites / 3);
560   std::vector<string> stack_frame_ids;
561   for (int32_t i = 0; i < kConcurrentWrites / 3; ++i) {
562     stack_frame_ids.push_back(actuals[i].stack_frame_with_id().id());
563   }
564   std::sort(stack_frame_ids.begin(), stack_frame_ids.end());
565   for (int32_t i = 0; i < kConcurrentWrites / 3; ++i) {
566     EXPECT_EQ(stack_frame_ids[i], strings::Printf("e%.2d", i * 3 + 1));
567   }
568 
569   ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
570   EXPECT_EQ(actuals.size(), kConcurrentWrites / 3);
571   std::vector<string> op_types;
572   std::vector<string> op_names;
573   for (int32_t i = 0; i < kConcurrentWrites / 3; ++i) {
574     op_types.push_back(actuals[i].graph_op_creation().op_type());
575     op_names.push_back(actuals[i].graph_op_creation().op_name());
576   }
577   std::sort(op_names.begin(), op_names.end());
578   for (int32_t i = 0; i < kConcurrentWrites / 3; ++i) {
579     EXPECT_EQ(op_types[i], "Log");
580     EXPECT_EQ(op_names[i], strings::Printf("Log_%.2d", i * 3 + 2));
581   }
582 }
583 
TEST_F(DebugEventsWriterTest,WriteExecutionWithCyclicBufferNoFlush)584 TEST_F(DebugEventsWriterTest, WriteExecutionWithCyclicBufferNoFlush) {
585   // Verify that no writing to disk happens until the flushing method is called.
586   const size_t kCyclicBufferSize = 10;
587   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
588       dump_root_, tfdbg_run_id_, kCyclicBufferSize);
589   TF_ASSERT_OK(writer->Init());
590 
591   // First, try writing and flushing more debug events than the capacity
592   // of the circular buffer, in a serial fashion.
593   for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
594     Execution* execution = new Execution();
595     execution->set_op_type("Log");
596     execution->add_input_tensor_ids(i);
597     TF_ASSERT_OK(writer->WriteExecution(execution));
598   }
599 
600   std::vector<DebugEvent> actuals;
601   // Before FlushExecutionFiles() is called, the file should be empty.
602   ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
603   EXPECT_EQ(actuals.size(), 0);
604 
605   // Close the writer so the files can be safely deleted.
606   TF_ASSERT_OK(writer->Close());
607 }
608 
TEST_F(DebugEventsWriterTest,WriteExecutionWithCyclicBufferFlush)609 TEST_F(DebugEventsWriterTest, WriteExecutionWithCyclicBufferFlush) {
610   // Verify that writing to disk happens when the flushing method is called.
611   const size_t kCyclicBufferSize = 10;
612   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
613       dump_root_, tfdbg_run_id_, kCyclicBufferSize);
614   TF_ASSERT_OK(writer->Init());
615 
616   // First, try writing and flushing more debug events than the capacity
617   // of the circular buffer, in a serial fashion.
618   for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
619     Execution* execution = new Execution();
620     execution->set_op_type("Log");
621     execution->add_input_tensor_ids(i);
622     TF_ASSERT_OK(writer->WriteExecution(execution));
623   }
624 
625   TF_ASSERT_OK(writer->FlushExecutionFiles());
626 
627   std::vector<DebugEvent> actuals;
628   // Expect there to be only the last kCyclicBufferSize debug events,
629   // and the order should be correct.
630   ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
631   EXPECT_EQ(actuals.size(), kCyclicBufferSize);
632   for (size_t i = 0; i < kCyclicBufferSize; ++i) {
633     EXPECT_EQ(actuals[i].execution().op_type(), "Log");
634     EXPECT_EQ(actuals[i].execution().input_tensor_ids().size(), 1);
635     EXPECT_EQ(actuals[i].execution().input_tensor_ids()[0],
636               kCyclicBufferSize + i);
637   }
638 
639   // Second, write more than the capacity of the circular buffer,
640   // in a concurrent fashion.
641   thread::ThreadPool* thread_pool =
642       new thread::ThreadPool(Env::Default(), "test_pool", 8);
643   std::atomic_int_fast64_t counter(0);
644   auto fn = [&writer, &counter]() {
645     Execution* execution = new Execution();
646     execution->set_op_type("Abs");
647     execution->add_input_tensor_ids(counter.fetch_add(1));
648     TF_ASSERT_OK(writer->WriteExecution(execution));
649   };
650   for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
651     thread_pool->Schedule(fn);
652   }
653   delete thread_pool;
654   TF_ASSERT_OK(writer->Close());
655 
656   ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
657   // NOTE: This includes the files from the first stage above, because the
658   // .execution file hasn't changed.
659   EXPECT_EQ(actuals.size(), kCyclicBufferSize * 2);
660   for (size_t i = 0; i < kCyclicBufferSize; ++i) {
661     const size_t index = i + kCyclicBufferSize;
662     EXPECT_EQ(actuals[index].execution().op_type(), "Abs");
663     EXPECT_EQ(actuals[index].execution().input_tensor_ids().size(), 1);
664     EXPECT_GE(actuals[index].execution().input_tensor_ids()[0], 0);
665     EXPECT_LE(actuals[index].execution().input_tensor_ids()[0],
666               kCyclicBufferSize * 2);
667   }
668 
669   // Verify no cross-talk.
670   ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
671   EXPECT_EQ(actuals.size(), 0);
672   ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
673   EXPECT_EQ(actuals.size(), 0);
674   ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
675   EXPECT_EQ(actuals.size(), 0);
676   ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
677                        &actuals);
678   EXPECT_EQ(actuals.size(), 0);
679 }
680 
TEST_F(DebugEventsWriterTest,WriteGrahExecutionTraceWithCyclicBufferNoFlush)681 TEST_F(DebugEventsWriterTest, WriteGrahExecutionTraceWithCyclicBufferNoFlush) {
682   // Check no writing to disk happens before the flushing method is called.
683   const size_t kCyclicBufferSize = 10;
684   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
685       dump_root_, tfdbg_run_id_, kCyclicBufferSize);
686   TF_ASSERT_OK(writer->Init());
687 
688   // First, try writing and flushing more debug events than the capacity
689   // of the circular buffer, in a serial fashion.
690   for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
691     GraphExecutionTrace* trace = new GraphExecutionTrace();
692     trace->set_tfdbg_context_id(strings::Printf("graph_%.2ld", i));
693     TF_ASSERT_OK(writer->WriteGraphExecutionTrace(trace));
694   }
695 
696   std::vector<DebugEvent> actuals;
697   // Before FlushExecutionFiles() is called, the file should be empty.
698   ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
699                        &actuals);
700   EXPECT_EQ(actuals.size(), 0);
701 
702   // Close the writer so the files can be safely deleted.
703   TF_ASSERT_OK(writer->Close());
704 }
705 
TEST_F(DebugEventsWriterTest,WriteGrahExecutionTraceWithoutPreviousInitCall)706 TEST_F(DebugEventsWriterTest, WriteGrahExecutionTraceWithoutPreviousInitCall) {
707   const size_t kCyclicBufferSize = -1;
708   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
709       dump_root_, tfdbg_run_id_, kCyclicBufferSize);
710   // NOTE(cais): `writer->Init()` is not called here before
711   // WriteGraphExecutionTrace() is called. This test checks that this is okay
712   // and the `GraphExecutionTrace` gets written correctly even without `Init()`
713   // being called first. This scenario can happen when a TF Graph with tfdbg
714   // debug ops are executed on a remote TF server.
715 
716   GraphExecutionTrace* trace = new GraphExecutionTrace();
717   trace->set_tfdbg_context_id(strings::Printf("graph_0"));
718   TF_ASSERT_OK(writer->WriteGraphExecutionTrace(trace));
719   TF_ASSERT_OK(writer->FlushExecutionFiles());
720 
721   std::vector<DebugEvent> actuals;
722   ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
723                        &actuals);
724   EXPECT_EQ(actuals.size(), 1);
725   EXPECT_EQ(actuals[0].graph_execution_trace().tfdbg_context_id(), "graph_0");
726 
727   // Close the writer so the files can be safely deleted.
728   TF_ASSERT_OK(writer->Close());
729 }
730 
TEST_F(DebugEventsWriterTest,WriteGrahExecutionTraceWithCyclicBufferFlush)731 TEST_F(DebugEventsWriterTest, WriteGrahExecutionTraceWithCyclicBufferFlush) {
732   const size_t kCyclicBufferSize = 10;
733   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
734       dump_root_, tfdbg_run_id_, kCyclicBufferSize);
735   TF_ASSERT_OK(writer->Init());
736 
737   // First, try writing and flushing more debug events than the capacity
738   // of the circular buffer, in a serial fashion.
739   for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
740     GraphExecutionTrace* trace = new GraphExecutionTrace();
741     trace->set_tfdbg_context_id(strings::Printf("graph_%.2ld", i));
742     TF_ASSERT_OK(writer->WriteGraphExecutionTrace(trace));
743   }
744 
745   TF_ASSERT_OK(writer->FlushExecutionFiles());
746 
747   std::vector<DebugEvent> actuals;
748   // Expect there to be only the last kCyclicBufferSize debug events,
749   // and the order should be correct.
750   ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
751                        &actuals);
752   EXPECT_EQ(actuals.size(), kCyclicBufferSize);
753   for (size_t i = 0; i < kCyclicBufferSize; ++i) {
754     EXPECT_EQ(actuals[i].graph_execution_trace().tfdbg_context_id(),
755               strings::Printf("graph_%.2ld", i + kCyclicBufferSize));
756   }
757 
758   // Second, write more than the capacity of the circular buffer,
759   // in a concurrent fashion.
760   thread::ThreadPool* thread_pool =
761       new thread::ThreadPool(Env::Default(), "test_pool", 8);
762   std::atomic_int_fast64_t counter(0);
763   auto fn = [&writer, &counter]() {
764     GraphExecutionTrace* trace = new GraphExecutionTrace();
765     trace->set_tfdbg_context_id(
766         strings::Printf("new_graph_%.2ld", counter.fetch_add(1)));
767     TF_ASSERT_OK(writer->WriteGraphExecutionTrace(trace));
768   };
769   for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
770     thread_pool->Schedule(fn);
771   }
772   delete thread_pool;
773   TF_ASSERT_OK(writer->Close());
774 
775   ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
776                        &actuals);
777   // NOTE: This includes the files from the first stage above, because the
778   // .graph_execution_traces file hasn't changed.
779   EXPECT_EQ(actuals.size(), kCyclicBufferSize * 2);
780   for (size_t i = 0; i < kCyclicBufferSize; ++i) {
781     const size_t index = i + kCyclicBufferSize;
782     EXPECT_EQ(actuals[index].graph_execution_trace().tfdbg_context_id().find(
783                   "new_graph_"),
784               0);
785   }
786 
787   // Verify no cross-talk.
788   ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
789   EXPECT_EQ(actuals.size(), 0);
790   ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
791   EXPECT_EQ(actuals.size(), 0);
792   ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
793   EXPECT_EQ(actuals.size(), 0);
794   ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
795   EXPECT_EQ(actuals.size(), 0);
796 }
797 
TEST_F(DebugEventsWriterTest,RegisterDeviceAndGetIdTrace)798 TEST_F(DebugEventsWriterTest, RegisterDeviceAndGetIdTrace) {
799   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
800       dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
801   TF_ASSERT_OK(writer->Init());
802 
803   // Register and get some device IDs in a concurrent fashion.
804   thread::ThreadPool* thread_pool =
805       new thread::ThreadPool(Env::Default(), "test_pool", 8);
806   int device_ids[8];
807   for (int i = 0; i < 8; ++i) {
808     thread_pool->Schedule([i, &writer, &device_ids]() {
809       const string device_name = strings::Printf(
810           "/job:localhost/replica:0/task:0/device:GPU:%d", i % 4);
811       device_ids[i] = writer->RegisterDeviceAndGetId(device_name);
812     });
813   }
814   delete thread_pool;
815   TF_ASSERT_OK(writer->FlushNonExecutionFiles());
816   TF_ASSERT_OK(writer->Close());
817 
818   // There should be only 4 unique device IDs, because there are only 4 unique
819   // device names.
820   EXPECT_EQ(device_ids[0], device_ids[4]);
821   EXPECT_EQ(device_ids[1], device_ids[5]);
822   EXPECT_EQ(device_ids[2], device_ids[6]);
823   EXPECT_EQ(device_ids[3], device_ids[7]);
824   // Assert that the four device IDs are all unique.
825   EXPECT_EQ(absl::flat_hash_set<int>(device_ids, device_ids + 8).size(), 4);
826 
827   std::vector<DebugEvent> actuals;
828   ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
829   // Due to the `% 4`, there are only 4 unique device names, even though there
830   // are 8 threads each calling `RegisterDeviceAndGetId`.
831   EXPECT_EQ(actuals.size(), 4);
832   for (const DebugEvent& actual : actuals) {
833     const string& device_name = actual.debugged_device().device_name();
834     int device_index = -1;
835     CHECK(absl::SimpleAtoi(device_name.substr(strlen(
836                                "/job:localhost/replica:0/task:0/device:GPU:")),
837                            &device_index));
838     EXPECT_EQ(actual.debugged_device().device_id(), device_ids[device_index]);
839   }
840 }
841 
TEST_F(DebugEventsWriterTest,DisableCyclicBufferBehavior)842 TEST_F(DebugEventsWriterTest, DisableCyclicBufferBehavior) {
843   const size_t kCyclicBufferSize = 0;  // A value <= 0 disables cyclic behavior.
844   DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
845       dump_root_, tfdbg_run_id_, kCyclicBufferSize);
846   TF_ASSERT_OK(writer->Init());
847 
848   const size_t kNumEvents = 20;
849 
850   for (size_t i = 0; i < kNumEvents; ++i) {
851     Execution* execution = new Execution();
852     execution->set_op_type("Log");
853     execution->add_input_tensor_ids(i);
854     TF_ASSERT_OK(writer->WriteExecution(execution));
855   }
856   TF_ASSERT_OK(writer->FlushExecutionFiles());
857 
858   std::vector<DebugEvent> actuals;
859   ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
860   EXPECT_EQ(actuals.size(), kNumEvents);
861   for (size_t i = 0; i < kNumEvents; ++i) {
862     EXPECT_EQ(actuals[i].execution().op_type(), "Log");
863     EXPECT_EQ(actuals[i].execution().input_tensor_ids().size(), 1);
864     EXPECT_EQ(actuals[i].execution().input_tensor_ids()[0], i);
865   }
866 
867   for (size_t i = 0; i < kNumEvents; ++i) {
868     GraphExecutionTrace* trace = new GraphExecutionTrace();
869     trace->set_tfdbg_context_id(strings::Printf("graph_%.2ld", i));
870     TF_ASSERT_OK(writer->WriteGraphExecutionTrace(trace));
871   }
872   TF_ASSERT_OK(writer->FlushExecutionFiles());
873 
874   ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
875                        &actuals);
876   EXPECT_EQ(actuals.size(), kNumEvents);
877   for (size_t i = 0; i < kNumEvents; ++i) {
878     EXPECT_EQ(actuals[i].graph_execution_trace().tfdbg_context_id(),
879               strings::Printf("graph_%.2ld", i));
880   }
881 
882   // Close the writer so the files can be safely deleted.
883   TF_ASSERT_OK(writer->Close());
884 }
885 
886 }  // namespace tfdbg
887 }  // namespace tensorflow
888