1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/util/debug_events_writer.h"
17
18 #include <vector>
19
20 #include "absl/container/flat_hash_set.h"
21 #include "tensorflow/core/lib/core/status_test_util.h"
22 #include "tensorflow/core/lib/core/threadpool.h"
23 #include "tensorflow/core/lib/io/path.h"
24 #include "tensorflow/core/lib/io/record_reader.h"
25 #include "tensorflow/core/lib/strings/stringprintf.h"
26 #include "tensorflow/core/platform/test.h"
27 #include "tensorflow/core/protobuf/graph_debug_info.pb.h"
28
29 namespace tensorflow {
30 namespace tfdbg {
31
32 // shorthand
env()33 Env* env() { return Env::Default(); }
34
35 class DebugEventsWriterTest : public ::testing::Test {
36 public:
GetDebugEventFileName(DebugEventsWriter * writer,DebugEventFileType type)37 static string GetDebugEventFileName(DebugEventsWriter* writer,
38 DebugEventFileType type) {
39 return writer->FileName(type);
40 }
41
ReadDebugEventProtos(DebugEventsWriter * writer,DebugEventFileType type,std::vector<DebugEvent> * protos)42 static void ReadDebugEventProtos(DebugEventsWriter* writer,
43 DebugEventFileType type,
44 std::vector<DebugEvent>* protos) {
45 protos->clear();
46 const string filename = writer->FileName(type);
47 std::unique_ptr<RandomAccessFile> debug_events_file;
48 TF_CHECK_OK(env()->NewRandomAccessFile(filename, &debug_events_file));
49 io::RecordReader* reader = new io::RecordReader(debug_events_file.get());
50
51 uint64 offset = 0;
52 DebugEvent actual;
53 while (ReadDebugEventProto(reader, &offset, &actual)) {
54 protos->push_back(actual);
55 }
56
57 delete reader;
58 }
59
ReadDebugEventProto(io::RecordReader * reader,uint64 * offset,DebugEvent * proto)60 static bool ReadDebugEventProto(io::RecordReader* reader, uint64* offset,
61 DebugEvent* proto) {
62 tstring record;
63 Status s = reader->ReadRecord(offset, &record);
64 if (!s.ok()) {
65 return false;
66 }
67 return ParseProtoUnlimited(proto, record);
68 }
69
SetUp()70 void SetUp() override {
71 dump_root_ = io::JoinPath(
72 testing::TmpDir(),
73 strings::Printf("%010lld", static_cast<long long>(env()->NowMicros())));
74 tfdbg_run_id_ = "test_tfdbg_run_id";
75 }
76
TearDown()77 void TearDown() override {
78 if (env()->IsDirectory(dump_root_).ok()) {
79 int64_t undeleted_files = 0;
80 int64_t undeleted_dirs = 0;
81 TF_ASSERT_OK(env()->DeleteRecursively(dump_root_, &undeleted_files,
82 &undeleted_dirs));
83 ASSERT_EQ(0, undeleted_files);
84 ASSERT_EQ(0, undeleted_dirs);
85 }
86 }
87
88 string dump_root_;
89 string tfdbg_run_id_;
90 };
91
TEST_F(DebugEventsWriterTest,GetDebugEventsWriterSameRootGivesSameObject)92 TEST_F(DebugEventsWriterTest, GetDebugEventsWriterSameRootGivesSameObject) {
93 // Test the per-dump_root_ singleton pattern.
94 DebugEventsWriter* writer_1 = DebugEventsWriter::GetDebugEventsWriter(
95 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
96 DebugEventsWriter* writer_2 = DebugEventsWriter::GetDebugEventsWriter(
97 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
98 EXPECT_EQ(writer_1, writer_2);
99 }
100
TEST_F(DebugEventsWriterTest,ConcurrentGetDebugEventsWriterSameDumpRoot)101 TEST_F(DebugEventsWriterTest, ConcurrentGetDebugEventsWriterSameDumpRoot) {
102 thread::ThreadPool* thread_pool =
103 new thread::ThreadPool(Env::Default(), "test_pool", 4);
104
105 std::vector<DebugEventsWriter*> writers;
106 mutex mu;
107 auto fn = [this, &writers, &mu]() {
108 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
109 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
110 {
111 mutex_lock l(mu);
112 writers.push_back(writer);
113 }
114 };
115 for (size_t i = 0; i < 4; ++i) {
116 thread_pool->Schedule(fn);
117 }
118 delete thread_pool;
119
120 EXPECT_EQ(writers.size(), 4);
121 EXPECT_EQ(writers[0], writers[1]);
122 EXPECT_EQ(writers[1], writers[2]);
123 EXPECT_EQ(writers[2], writers[3]);
124 }
125
TEST_F(DebugEventsWriterTest,ConcurrentGetDebugEventsWriterDiffDumpRoots)126 TEST_F(DebugEventsWriterTest, ConcurrentGetDebugEventsWriterDiffDumpRoots) {
127 thread::ThreadPool* thread_pool =
128 new thread::ThreadPool(Env::Default(), "test_pool", 3);
129
130 std::atomic_int_fast64_t counter(0);
131 std::vector<DebugEventsWriter*> writers;
132 mutex mu;
133 auto fn = [this, &counter, &writers, &mu]() {
134 const string new_dump_root =
135 io::JoinPath(dump_root_, strings::Printf("%ld", counter.fetch_add(1)));
136 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
137 new_dump_root, tfdbg_run_id_,
138 DebugEventsWriter::kDefaultCyclicBufferSize);
139 {
140 mutex_lock l(mu);
141 writers.push_back(writer);
142 }
143 };
144 for (size_t i = 0; i < 3; ++i) {
145 thread_pool->Schedule(fn);
146 }
147 delete thread_pool;
148
149 EXPECT_EQ(writers.size(), 3);
150 EXPECT_NE(writers[0], writers[1]);
151 EXPECT_NE(writers[0], writers[2]);
152 EXPECT_NE(writers[1], writers[2]);
153 }
154
TEST_F(DebugEventsWriterTest,GetDebugEventsWriterDifferentRoots)155 TEST_F(DebugEventsWriterTest, GetDebugEventsWriterDifferentRoots) {
156 // Test the DebugEventsWriters for different directories are different.
157 DebugEventsWriter* writer_1 = DebugEventsWriter::GetDebugEventsWriter(
158 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
159 const string dump_root_2 = io::JoinPath(dump_root_, "subdirectory");
160 DebugEventsWriter* writer_2 = DebugEventsWriter::GetDebugEventsWriter(
161 dump_root_2, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
162 EXPECT_NE(writer_1, writer_2);
163 }
164
TEST_F(DebugEventsWriterTest,GetAndInitDebugEventsWriter)165 TEST_F(DebugEventsWriterTest, GetAndInitDebugEventsWriter) {
166 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
167 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
168 TF_ASSERT_OK(writer->Init());
169 TF_ASSERT_OK(writer->Close());
170
171 // Verify the metadata file's content.
172 std::vector<DebugEvent> actuals;
173 ReadDebugEventProtos(writer, DebugEventFileType::METADATA, &actuals);
174 EXPECT_EQ(actuals.size(), 1);
175 EXPECT_GT(actuals[0].debug_metadata().tensorflow_version().length(), 0);
176 // Check the content of the file version string.
177 const string file_version = actuals[0].debug_metadata().file_version();
178 EXPECT_EQ(file_version.find(DebugEventsWriter::kVersionPrefix), 0);
179 EXPECT_GT(file_version.size(), strlen(DebugEventsWriter::kVersionPrefix));
180 // Check the tfdbg run ID.
181 EXPECT_EQ(actuals[0].debug_metadata().tfdbg_run_id(), "test_tfdbg_run_id");
182
183 // Verify that the .source_files file has been created and is empty.
184 ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
185 // Verify that the .stack_frames file has been created and is empty.
186 ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
187 }
188
TEST_F(DebugEventsWriterTest,CallingCloseWithoutInitIsOkay)189 TEST_F(DebugEventsWriterTest, CallingCloseWithoutInitIsOkay) {
190 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
191 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
192 TF_ASSERT_OK(writer->Close());
193 }
194
TEST_F(DebugEventsWriterTest,CallingCloseTwiceIsOkay)195 TEST_F(DebugEventsWriterTest, CallingCloseTwiceIsOkay) {
196 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
197 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
198 TF_ASSERT_OK(writer->Close());
199 TF_ASSERT_OK(writer->Close());
200 }
201
TEST_F(DebugEventsWriterTest,ConcurrentInitCalls)202 TEST_F(DebugEventsWriterTest, ConcurrentInitCalls) {
203 // Test that concurrent calls to Init() works correctly.
204 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
205 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
206
207 thread::ThreadPool* thread_pool =
208 new thread::ThreadPool(Env::Default(), "test_pool", 4);
209 auto fn = [&writer]() { TF_ASSERT_OK(writer->Init()); };
210 for (size_t i = 0; i < 3; ++i) {
211 thread_pool->Schedule(fn);
212 }
213 delete thread_pool;
214
215 TF_ASSERT_OK(writer->Close());
216
217 // Verify the metadata file's content.
218 std::vector<DebugEvent> actuals;
219 ReadDebugEventProtos(writer, DebugEventFileType::METADATA, &actuals);
220 EXPECT_EQ(actuals.size(), 1);
221 EXPECT_GT(actuals[0].debug_metadata().tensorflow_version().length(), 0);
222 // Check the content of the file version string.
223 const string file_version = actuals[0].debug_metadata().file_version();
224 EXPECT_EQ(file_version.find(DebugEventsWriter::kVersionPrefix), 0);
225 EXPECT_GT(file_version.size(), strlen(DebugEventsWriter::kVersionPrefix));
226 EXPECT_EQ(actuals[0].debug_metadata().tfdbg_run_id(), "test_tfdbg_run_id");
227
228 // Verify that the .source_files file has been created and is empty.
229 ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
230 // Verify that the .stack_frames file has been created and is empty.
231 ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
232 }
233
TEST_F(DebugEventsWriterTest,InitTwiceDoesNotCreateNewMetadataFile)234 TEST_F(DebugEventsWriterTest, InitTwiceDoesNotCreateNewMetadataFile) {
235 // Test that Init() is idempotent.
236 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
237 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
238 TF_ASSERT_OK(writer->Init());
239
240 std::vector<DebugEvent> actuals;
241 ReadDebugEventProtos(writer, DebugEventFileType::METADATA, &actuals);
242 EXPECT_EQ(actuals.size(), 1);
243 EXPECT_GT(actuals[0].debug_metadata().tensorflow_version().length(), 0);
244 EXPECT_EQ(actuals[0].debug_metadata().tfdbg_run_id(), "test_tfdbg_run_id");
245 EXPECT_GE(actuals[0].debug_metadata().file_version().size(), 0);
246
247 string metadata_path_1 =
248 GetDebugEventFileName(writer, DebugEventFileType::METADATA);
249 TF_ASSERT_OK(writer->Init());
250 EXPECT_EQ(GetDebugEventFileName(writer, DebugEventFileType::METADATA),
251 metadata_path_1);
252 TF_ASSERT_OK(writer->Close());
253
254 // Verify the metadata file's content.
255 ReadDebugEventProtos(writer, DebugEventFileType::METADATA, &actuals);
256 EXPECT_EQ(actuals.size(), 1);
257 EXPECT_GT(actuals[0].debug_metadata().tensorflow_version().length(), 0);
258 EXPECT_EQ(actuals[0].debug_metadata().tfdbg_run_id(), "test_tfdbg_run_id");
259 EXPECT_GE(actuals[0].debug_metadata().file_version().size(), 0);
260 }
261
TEST_F(DebugEventsWriterTest,WriteSourceFile)262 TEST_F(DebugEventsWriterTest, WriteSourceFile) {
263 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
264 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
265 TF_ASSERT_OK(writer->Init());
266
267 SourceFile* source_file_1 = new SourceFile();
268 source_file_1->set_file_path("/home/tf_programs/main.py");
269 source_file_1->set_host_name("localhost.localdomain");
270 source_file_1->add_lines("import tensorflow as tf");
271 source_file_1->add_lines("");
272 source_file_1->add_lines("print(tf.constant([42.0]))");
273 source_file_1->add_lines("");
274 TF_ASSERT_OK(writer->WriteSourceFile(source_file_1));
275
276 SourceFile* source_file_2 = new SourceFile();
277 source_file_2->set_file_path("/home/tf_programs/train.py");
278 source_file_2->set_host_name("localhost.localdomain");
279 source_file_2->add_lines("import tensorflow.keras as keras");
280 source_file_2->add_lines("");
281 source_file_2->add_lines("model = keras.Sequential()");
282 TF_ASSERT_OK(writer->WriteSourceFile(source_file_2));
283
284 TF_ASSERT_OK(writer->FlushNonExecutionFiles());
285 TF_ASSERT_OK(writer->Close());
286
287 std::vector<DebugEvent> actuals;
288 ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
289 EXPECT_EQ(actuals.size(), 2);
290 EXPECT_GT(actuals[0].wall_time(), 0);
291 EXPECT_GT(actuals[1].wall_time(), actuals[0].wall_time());
292
293 SourceFile actual_source_file_1 = actuals[0].source_file();
294 EXPECT_EQ(actual_source_file_1.file_path(), "/home/tf_programs/main.py");
295 EXPECT_EQ(actual_source_file_1.host_name(), "localhost.localdomain");
296 EXPECT_EQ(actual_source_file_1.lines().size(), 4);
297 EXPECT_EQ(actual_source_file_1.lines()[0], "import tensorflow as tf");
298 EXPECT_EQ(actual_source_file_1.lines()[1], "");
299 EXPECT_EQ(actual_source_file_1.lines()[2], "print(tf.constant([42.0]))");
300 EXPECT_EQ(actual_source_file_1.lines()[3], "");
301
302 SourceFile actual_source_file_2 = actuals[1].source_file();
303 EXPECT_EQ(actual_source_file_2.file_path(), "/home/tf_programs/train.py");
304 EXPECT_EQ(actual_source_file_2.host_name(), "localhost.localdomain");
305 EXPECT_EQ(actual_source_file_2.lines().size(), 3);
306 EXPECT_EQ(actual_source_file_2.lines()[0],
307 "import tensorflow.keras as keras");
308 EXPECT_EQ(actual_source_file_2.lines()[1], "");
309 EXPECT_EQ(actual_source_file_2.lines()[2], "model = keras.Sequential()");
310
311 // Verify no cross talk in the other non-execution debug-event files.
312 ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
313 EXPECT_EQ(actuals.size(), 0);
314 ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
315 EXPECT_EQ(actuals.size(), 0);
316 ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
317 EXPECT_EQ(actuals.size(), 0);
318 ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
319 &actuals);
320 EXPECT_EQ(actuals.size(), 0);
321 }
322
TEST_F(DebugEventsWriterTest,WriteStackFramesFile)323 TEST_F(DebugEventsWriterTest, WriteStackFramesFile) {
324 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
325 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
326 TF_ASSERT_OK(writer->Init());
327
328 StackFrameWithId* stack_frame_1 = new StackFrameWithId();
329 stack_frame_1->set_id("deadbeaf");
330 GraphDebugInfo::FileLineCol* file_line_col =
331 stack_frame_1->mutable_file_line_col();
332 file_line_col->set_file_index(12);
333 file_line_col->set_line(20);
334 file_line_col->set_col(2);
335 file_line_col->set_func("my_func");
336 file_line_col->set_code(" x = y + z");
337
338 StackFrameWithId* stack_frame_2 = new StackFrameWithId();
339 stack_frame_2->set_id("eeeeeeec");
340 file_line_col = stack_frame_2->mutable_file_line_col();
341 file_line_col->set_file_index(12);
342 file_line_col->set_line(21);
343 file_line_col->set_col(4);
344 file_line_col->set_func("my_func");
345 file_line_col->set_code(" x = x ** 2.0");
346
347 TF_ASSERT_OK(writer->WriteStackFrameWithId(stack_frame_1));
348 TF_ASSERT_OK(writer->WriteStackFrameWithId(stack_frame_2));
349 TF_ASSERT_OK(writer->FlushNonExecutionFiles());
350 TF_ASSERT_OK(writer->Close());
351
352 std::vector<DebugEvent> actuals;
353 ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
354 EXPECT_EQ(actuals.size(), 2);
355 EXPECT_GT(actuals[0].wall_time(), 0);
356 EXPECT_GT(actuals[1].wall_time(), actuals[0].wall_time());
357
358 StackFrameWithId actual_stack_frame_1 = actuals[0].stack_frame_with_id();
359 EXPECT_EQ(actual_stack_frame_1.id(), "deadbeaf");
360 GraphDebugInfo::FileLineCol file_line_col_1 =
361 actual_stack_frame_1.file_line_col();
362 EXPECT_EQ(file_line_col_1.file_index(), 12);
363 EXPECT_EQ(file_line_col_1.line(), 20);
364 EXPECT_EQ(file_line_col_1.col(), 2);
365 EXPECT_EQ(file_line_col_1.func(), "my_func");
366 EXPECT_EQ(file_line_col_1.code(), " x = y + z");
367
368 StackFrameWithId actual_stack_frame_2 = actuals[1].stack_frame_with_id();
369 EXPECT_EQ(actual_stack_frame_2.id(), "eeeeeeec");
370 GraphDebugInfo::FileLineCol file_line_col_2 =
371 actual_stack_frame_2.file_line_col();
372 EXPECT_EQ(file_line_col_2.file_index(), 12);
373 EXPECT_EQ(file_line_col_2.line(), 21);
374 EXPECT_EQ(file_line_col_2.col(), 4);
375 EXPECT_EQ(file_line_col_2.func(), "my_func");
376 EXPECT_EQ(file_line_col_2.code(), " x = x ** 2.0");
377
378 // Verify no cross talk in the other non-execution debug-event files.
379 ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
380 EXPECT_EQ(actuals.size(), 0);
381 ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
382 EXPECT_EQ(actuals.size(), 0);
383 }
384
TEST_F(DebugEventsWriterTest,WriteGraphOpCreationAndDebuggedGraph)385 TEST_F(DebugEventsWriterTest, WriteGraphOpCreationAndDebuggedGraph) {
386 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
387 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
388 TF_ASSERT_OK(writer->Init());
389
390 GraphOpCreation* graph_op_creation = new GraphOpCreation();
391 graph_op_creation->set_op_type("MatMul");
392 graph_op_creation->set_op_name("Dense_1/MatMul");
393 TF_ASSERT_OK(writer->WriteGraphOpCreation(graph_op_creation));
394
395 DebuggedGraph* debugged_graph = new DebuggedGraph();
396 debugged_graph->set_graph_id("deadbeaf");
397 debugged_graph->set_graph_name("my_func_graph");
398 TF_ASSERT_OK(writer->WriteDebuggedGraph(debugged_graph));
399
400 TF_ASSERT_OK(writer->FlushNonExecutionFiles());
401 TF_ASSERT_OK(writer->Close());
402
403 std::vector<DebugEvent> actuals;
404 ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
405 EXPECT_EQ(actuals.size(), 2);
406 EXPECT_GT(actuals[0].wall_time(), 0);
407 EXPECT_GT(actuals[1].wall_time(), actuals[0].wall_time());
408
409 GraphOpCreation actual_op_creation = actuals[0].graph_op_creation();
410 EXPECT_EQ(actual_op_creation.op_type(), "MatMul");
411 EXPECT_EQ(actual_op_creation.op_name(), "Dense_1/MatMul");
412
413 DebuggedGraph actual_debugged_graph = actuals[1].debugged_graph();
414 EXPECT_EQ(actual_debugged_graph.graph_id(), "deadbeaf");
415 EXPECT_EQ(actual_debugged_graph.graph_name(), "my_func_graph");
416
417 // Verify no cross talk in the other non-execution debug-event files.
418 ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
419 EXPECT_EQ(actuals.size(), 0);
420 ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
421 EXPECT_EQ(actuals.size(), 0);
422 }
423
TEST_F(DebugEventsWriterTest,ConcurrentWriteCallsToTheSameFile)424 TEST_F(DebugEventsWriterTest, ConcurrentWriteCallsToTheSameFile) {
425 const size_t kConcurrentWrites = 100;
426 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
427 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
428 TF_ASSERT_OK(writer->Init());
429
430 thread::ThreadPool* thread_pool =
431 new thread::ThreadPool(Env::Default(), "test_pool", 8);
432 std::atomic_int_fast64_t counter(0);
433 auto fn = [&writer, &counter]() {
434 const string file_path = strings::Printf(
435 "/home/tf_programs/program_%.3ld.py", counter.fetch_add(1));
436 SourceFile* source_file = new SourceFile();
437 source_file->set_file_path(file_path);
438 source_file->set_host_name("localhost.localdomain");
439 TF_ASSERT_OK(writer->WriteSourceFile(source_file));
440 };
441 for (size_t i = 0; i < kConcurrentWrites; ++i) {
442 thread_pool->Schedule(fn);
443 }
444 delete thread_pool;
445
446 TF_ASSERT_OK(writer->Close());
447
448 std::vector<DebugEvent> actuals;
449 ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
450 EXPECT_EQ(actuals.size(), kConcurrentWrites);
451 std::vector<string> file_paths;
452 std::vector<string> host_names;
453 for (size_t i = 0; i < kConcurrentWrites; ++i) {
454 file_paths.push_back(actuals[i].source_file().file_path());
455 host_names.push_back(actuals[i].source_file().host_name());
456 }
457 std::sort(file_paths.begin(), file_paths.end());
458 for (size_t i = 0; i < kConcurrentWrites; ++i) {
459 EXPECT_EQ(file_paths[i],
460 strings::Printf("/home/tf_programs/program_%.3ld.py", i));
461 EXPECT_EQ(host_names[i], "localhost.localdomain");
462 }
463 }
464
TEST_F(DebugEventsWriterTest,ConcurrentWriteAndFlushCallsToTheSameFile)465 TEST_F(DebugEventsWriterTest, ConcurrentWriteAndFlushCallsToTheSameFile) {
466 const size_t kConcurrentWrites = 100;
467 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
468 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
469 TF_ASSERT_OK(writer->Init());
470
471 thread::ThreadPool* thread_pool =
472 new thread::ThreadPool(Env::Default(), "test_pool", 8);
473 std::atomic_int_fast64_t counter(0);
474 auto fn = [&writer, &counter]() {
475 const string file_path = strings::Printf(
476 "/home/tf_programs/program_%.3ld.py", counter.fetch_add(1));
477 SourceFile* source_file = new SourceFile();
478 source_file->set_file_path(file_path);
479 source_file->set_host_name("localhost.localdomain");
480 TF_ASSERT_OK(writer->WriteSourceFile(source_file));
481 TF_ASSERT_OK(writer->FlushNonExecutionFiles());
482 };
483 for (size_t i = 0; i < kConcurrentWrites; ++i) {
484 thread_pool->Schedule(fn);
485 }
486 delete thread_pool;
487
488 TF_ASSERT_OK(writer->Close());
489
490 std::vector<DebugEvent> actuals;
491 ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
492 EXPECT_EQ(actuals.size(), kConcurrentWrites);
493 std::vector<string> file_paths;
494 std::vector<string> host_names;
495 for (size_t i = 0; i < kConcurrentWrites; ++i) {
496 file_paths.push_back(actuals[i].source_file().file_path());
497 host_names.push_back(actuals[i].source_file().host_name());
498 }
499 std::sort(file_paths.begin(), file_paths.end());
500 for (size_t i = 0; i < kConcurrentWrites; ++i) {
501 EXPECT_EQ(file_paths[i],
502 strings::Printf("/home/tf_programs/program_%.3ld.py", i));
503 EXPECT_EQ(host_names[i], "localhost.localdomain");
504 }
505 }
506
TEST_F(DebugEventsWriterTest,ConcurrentWriteCallsToTheDifferentFiles)507 TEST_F(DebugEventsWriterTest, ConcurrentWriteCallsToTheDifferentFiles) {
508 const int32_t kConcurrentWrites = 30;
509 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
510 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
511 TF_ASSERT_OK(writer->Init());
512
513 thread::ThreadPool* thread_pool =
514 new thread::ThreadPool(Env::Default(), "test_pool", 10);
515 std::atomic_int_fast32_t counter(0);
516 auto fn = [&writer, &counter]() {
517 const int32_t index = counter.fetch_add(1);
518 if (index % 3 == 0) {
519 SourceFile* source_file = new SourceFile();
520 source_file->set_file_path(
521 strings::Printf("/home/tf_programs/program_%.2d.py", index));
522 source_file->set_host_name("localhost.localdomain");
523 TF_ASSERT_OK(writer->WriteSourceFile(source_file));
524 } else if (index % 3 == 1) {
525 StackFrameWithId* stack_frame = new StackFrameWithId();
526 stack_frame->set_id(strings::Printf("e%.2d", index));
527 TF_ASSERT_OK(writer->WriteStackFrameWithId(stack_frame));
528 } else {
529 GraphOpCreation* op_creation = new GraphOpCreation();
530 op_creation->set_op_type("Log");
531 op_creation->set_op_name(strings::Printf("Log_%.2d", index));
532 TF_ASSERT_OK(writer->WriteGraphOpCreation(op_creation));
533 }
534 };
535 for (size_t i = 0; i < kConcurrentWrites; ++i) {
536 thread_pool->Schedule(fn);
537 }
538 delete thread_pool;
539
540 TF_ASSERT_OK(writer->Close());
541
542 std::vector<DebugEvent> actuals;
543 ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
544 EXPECT_EQ(actuals.size(), kConcurrentWrites / 3);
545 std::vector<string> file_paths;
546 std::vector<string> host_names;
547 for (int32_t i = 0; i < kConcurrentWrites / 3; ++i) {
548 file_paths.push_back(actuals[i].source_file().file_path());
549 host_names.push_back(actuals[i].source_file().host_name());
550 }
551 std::sort(file_paths.begin(), file_paths.end());
552 for (int32_t i = 0; i < kConcurrentWrites / 3; ++i) {
553 EXPECT_EQ(file_paths[i],
554 strings::Printf("/home/tf_programs/program_%.2d.py", i * 3));
555 EXPECT_EQ(host_names[i], "localhost.localdomain");
556 }
557
558 ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
559 EXPECT_EQ(actuals.size(), kConcurrentWrites / 3);
560 std::vector<string> stack_frame_ids;
561 for (int32_t i = 0; i < kConcurrentWrites / 3; ++i) {
562 stack_frame_ids.push_back(actuals[i].stack_frame_with_id().id());
563 }
564 std::sort(stack_frame_ids.begin(), stack_frame_ids.end());
565 for (int32_t i = 0; i < kConcurrentWrites / 3; ++i) {
566 EXPECT_EQ(stack_frame_ids[i], strings::Printf("e%.2d", i * 3 + 1));
567 }
568
569 ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
570 EXPECT_EQ(actuals.size(), kConcurrentWrites / 3);
571 std::vector<string> op_types;
572 std::vector<string> op_names;
573 for (int32_t i = 0; i < kConcurrentWrites / 3; ++i) {
574 op_types.push_back(actuals[i].graph_op_creation().op_type());
575 op_names.push_back(actuals[i].graph_op_creation().op_name());
576 }
577 std::sort(op_names.begin(), op_names.end());
578 for (int32_t i = 0; i < kConcurrentWrites / 3; ++i) {
579 EXPECT_EQ(op_types[i], "Log");
580 EXPECT_EQ(op_names[i], strings::Printf("Log_%.2d", i * 3 + 2));
581 }
582 }
583
TEST_F(DebugEventsWriterTest,WriteExecutionWithCyclicBufferNoFlush)584 TEST_F(DebugEventsWriterTest, WriteExecutionWithCyclicBufferNoFlush) {
585 // Verify that no writing to disk happens until the flushing method is called.
586 const size_t kCyclicBufferSize = 10;
587 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
588 dump_root_, tfdbg_run_id_, kCyclicBufferSize);
589 TF_ASSERT_OK(writer->Init());
590
591 // First, try writing and flushing more debug events than the capacity
592 // of the circular buffer, in a serial fashion.
593 for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
594 Execution* execution = new Execution();
595 execution->set_op_type("Log");
596 execution->add_input_tensor_ids(i);
597 TF_ASSERT_OK(writer->WriteExecution(execution));
598 }
599
600 std::vector<DebugEvent> actuals;
601 // Before FlushExecutionFiles() is called, the file should be empty.
602 ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
603 EXPECT_EQ(actuals.size(), 0);
604
605 // Close the writer so the files can be safely deleted.
606 TF_ASSERT_OK(writer->Close());
607 }
608
TEST_F(DebugEventsWriterTest,WriteExecutionWithCyclicBufferFlush)609 TEST_F(DebugEventsWriterTest, WriteExecutionWithCyclicBufferFlush) {
610 // Verify that writing to disk happens when the flushing method is called.
611 const size_t kCyclicBufferSize = 10;
612 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
613 dump_root_, tfdbg_run_id_, kCyclicBufferSize);
614 TF_ASSERT_OK(writer->Init());
615
616 // First, try writing and flushing more debug events than the capacity
617 // of the circular buffer, in a serial fashion.
618 for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
619 Execution* execution = new Execution();
620 execution->set_op_type("Log");
621 execution->add_input_tensor_ids(i);
622 TF_ASSERT_OK(writer->WriteExecution(execution));
623 }
624
625 TF_ASSERT_OK(writer->FlushExecutionFiles());
626
627 std::vector<DebugEvent> actuals;
628 // Expect there to be only the last kCyclicBufferSize debug events,
629 // and the order should be correct.
630 ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
631 EXPECT_EQ(actuals.size(), kCyclicBufferSize);
632 for (size_t i = 0; i < kCyclicBufferSize; ++i) {
633 EXPECT_EQ(actuals[i].execution().op_type(), "Log");
634 EXPECT_EQ(actuals[i].execution().input_tensor_ids().size(), 1);
635 EXPECT_EQ(actuals[i].execution().input_tensor_ids()[0],
636 kCyclicBufferSize + i);
637 }
638
639 // Second, write more than the capacity of the circular buffer,
640 // in a concurrent fashion.
641 thread::ThreadPool* thread_pool =
642 new thread::ThreadPool(Env::Default(), "test_pool", 8);
643 std::atomic_int_fast64_t counter(0);
644 auto fn = [&writer, &counter]() {
645 Execution* execution = new Execution();
646 execution->set_op_type("Abs");
647 execution->add_input_tensor_ids(counter.fetch_add(1));
648 TF_ASSERT_OK(writer->WriteExecution(execution));
649 };
650 for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
651 thread_pool->Schedule(fn);
652 }
653 delete thread_pool;
654 TF_ASSERT_OK(writer->Close());
655
656 ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
657 // NOTE: This includes the files from the first stage above, because the
658 // .execution file hasn't changed.
659 EXPECT_EQ(actuals.size(), kCyclicBufferSize * 2);
660 for (size_t i = 0; i < kCyclicBufferSize; ++i) {
661 const size_t index = i + kCyclicBufferSize;
662 EXPECT_EQ(actuals[index].execution().op_type(), "Abs");
663 EXPECT_EQ(actuals[index].execution().input_tensor_ids().size(), 1);
664 EXPECT_GE(actuals[index].execution().input_tensor_ids()[0], 0);
665 EXPECT_LE(actuals[index].execution().input_tensor_ids()[0],
666 kCyclicBufferSize * 2);
667 }
668
669 // Verify no cross-talk.
670 ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
671 EXPECT_EQ(actuals.size(), 0);
672 ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
673 EXPECT_EQ(actuals.size(), 0);
674 ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
675 EXPECT_EQ(actuals.size(), 0);
676 ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
677 &actuals);
678 EXPECT_EQ(actuals.size(), 0);
679 }
680
TEST_F(DebugEventsWriterTest,WriteGrahExecutionTraceWithCyclicBufferNoFlush)681 TEST_F(DebugEventsWriterTest, WriteGrahExecutionTraceWithCyclicBufferNoFlush) {
682 // Check no writing to disk happens before the flushing method is called.
683 const size_t kCyclicBufferSize = 10;
684 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
685 dump_root_, tfdbg_run_id_, kCyclicBufferSize);
686 TF_ASSERT_OK(writer->Init());
687
688 // First, try writing and flushing more debug events than the capacity
689 // of the circular buffer, in a serial fashion.
690 for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
691 GraphExecutionTrace* trace = new GraphExecutionTrace();
692 trace->set_tfdbg_context_id(strings::Printf("graph_%.2ld", i));
693 TF_ASSERT_OK(writer->WriteGraphExecutionTrace(trace));
694 }
695
696 std::vector<DebugEvent> actuals;
697 // Before FlushExecutionFiles() is called, the file should be empty.
698 ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
699 &actuals);
700 EXPECT_EQ(actuals.size(), 0);
701
702 // Close the writer so the files can be safely deleted.
703 TF_ASSERT_OK(writer->Close());
704 }
705
TEST_F(DebugEventsWriterTest,WriteGrahExecutionTraceWithoutPreviousInitCall)706 TEST_F(DebugEventsWriterTest, WriteGrahExecutionTraceWithoutPreviousInitCall) {
707 const size_t kCyclicBufferSize = -1;
708 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
709 dump_root_, tfdbg_run_id_, kCyclicBufferSize);
710 // NOTE(cais): `writer->Init()` is not called here before
711 // WriteGraphExecutionTrace() is called. This test checks that this is okay
712 // and the `GraphExecutionTrace` gets written correctly even without `Init()`
713 // being called first. This scenario can happen when a TF Graph with tfdbg
714 // debug ops are executed on a remote TF server.
715
716 GraphExecutionTrace* trace = new GraphExecutionTrace();
717 trace->set_tfdbg_context_id(strings::Printf("graph_0"));
718 TF_ASSERT_OK(writer->WriteGraphExecutionTrace(trace));
719 TF_ASSERT_OK(writer->FlushExecutionFiles());
720
721 std::vector<DebugEvent> actuals;
722 ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
723 &actuals);
724 EXPECT_EQ(actuals.size(), 1);
725 EXPECT_EQ(actuals[0].graph_execution_trace().tfdbg_context_id(), "graph_0");
726
727 // Close the writer so the files can be safely deleted.
728 TF_ASSERT_OK(writer->Close());
729 }
730
TEST_F(DebugEventsWriterTest,WriteGrahExecutionTraceWithCyclicBufferFlush)731 TEST_F(DebugEventsWriterTest, WriteGrahExecutionTraceWithCyclicBufferFlush) {
732 const size_t kCyclicBufferSize = 10;
733 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
734 dump_root_, tfdbg_run_id_, kCyclicBufferSize);
735 TF_ASSERT_OK(writer->Init());
736
737 // First, try writing and flushing more debug events than the capacity
738 // of the circular buffer, in a serial fashion.
739 for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
740 GraphExecutionTrace* trace = new GraphExecutionTrace();
741 trace->set_tfdbg_context_id(strings::Printf("graph_%.2ld", i));
742 TF_ASSERT_OK(writer->WriteGraphExecutionTrace(trace));
743 }
744
745 TF_ASSERT_OK(writer->FlushExecutionFiles());
746
747 std::vector<DebugEvent> actuals;
748 // Expect there to be only the last kCyclicBufferSize debug events,
749 // and the order should be correct.
750 ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
751 &actuals);
752 EXPECT_EQ(actuals.size(), kCyclicBufferSize);
753 for (size_t i = 0; i < kCyclicBufferSize; ++i) {
754 EXPECT_EQ(actuals[i].graph_execution_trace().tfdbg_context_id(),
755 strings::Printf("graph_%.2ld", i + kCyclicBufferSize));
756 }
757
758 // Second, write more than the capacity of the circular buffer,
759 // in a concurrent fashion.
760 thread::ThreadPool* thread_pool =
761 new thread::ThreadPool(Env::Default(), "test_pool", 8);
762 std::atomic_int_fast64_t counter(0);
763 auto fn = [&writer, &counter]() {
764 GraphExecutionTrace* trace = new GraphExecutionTrace();
765 trace->set_tfdbg_context_id(
766 strings::Printf("new_graph_%.2ld", counter.fetch_add(1)));
767 TF_ASSERT_OK(writer->WriteGraphExecutionTrace(trace));
768 };
769 for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
770 thread_pool->Schedule(fn);
771 }
772 delete thread_pool;
773 TF_ASSERT_OK(writer->Close());
774
775 ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
776 &actuals);
777 // NOTE: This includes the files from the first stage above, because the
778 // .graph_execution_traces file hasn't changed.
779 EXPECT_EQ(actuals.size(), kCyclicBufferSize * 2);
780 for (size_t i = 0; i < kCyclicBufferSize; ++i) {
781 const size_t index = i + kCyclicBufferSize;
782 EXPECT_EQ(actuals[index].graph_execution_trace().tfdbg_context_id().find(
783 "new_graph_"),
784 0);
785 }
786
787 // Verify no cross-talk.
788 ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
789 EXPECT_EQ(actuals.size(), 0);
790 ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
791 EXPECT_EQ(actuals.size(), 0);
792 ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
793 EXPECT_EQ(actuals.size(), 0);
794 ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
795 EXPECT_EQ(actuals.size(), 0);
796 }
797
TEST_F(DebugEventsWriterTest,RegisterDeviceAndGetIdTrace)798 TEST_F(DebugEventsWriterTest, RegisterDeviceAndGetIdTrace) {
799 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
800 dump_root_, tfdbg_run_id_, DebugEventsWriter::kDefaultCyclicBufferSize);
801 TF_ASSERT_OK(writer->Init());
802
803 // Register and get some device IDs in a concurrent fashion.
804 thread::ThreadPool* thread_pool =
805 new thread::ThreadPool(Env::Default(), "test_pool", 8);
806 int device_ids[8];
807 for (int i = 0; i < 8; ++i) {
808 thread_pool->Schedule([i, &writer, &device_ids]() {
809 const string device_name = strings::Printf(
810 "/job:localhost/replica:0/task:0/device:GPU:%d", i % 4);
811 device_ids[i] = writer->RegisterDeviceAndGetId(device_name);
812 });
813 }
814 delete thread_pool;
815 TF_ASSERT_OK(writer->FlushNonExecutionFiles());
816 TF_ASSERT_OK(writer->Close());
817
818 // There should be only 4 unique device IDs, because there are only 4 unique
819 // device names.
820 EXPECT_EQ(device_ids[0], device_ids[4]);
821 EXPECT_EQ(device_ids[1], device_ids[5]);
822 EXPECT_EQ(device_ids[2], device_ids[6]);
823 EXPECT_EQ(device_ids[3], device_ids[7]);
824 // Assert that the four device IDs are all unique.
825 EXPECT_EQ(absl::flat_hash_set<int>(device_ids, device_ids + 8).size(), 4);
826
827 std::vector<DebugEvent> actuals;
828 ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
829 // Due to the `% 4`, there are only 4 unique device names, even though there
830 // are 8 threads each calling `RegisterDeviceAndGetId`.
831 EXPECT_EQ(actuals.size(), 4);
832 for (const DebugEvent& actual : actuals) {
833 const string& device_name = actual.debugged_device().device_name();
834 int device_index = -1;
835 CHECK(absl::SimpleAtoi(device_name.substr(strlen(
836 "/job:localhost/replica:0/task:0/device:GPU:")),
837 &device_index));
838 EXPECT_EQ(actual.debugged_device().device_id(), device_ids[device_index]);
839 }
840 }
841
TEST_F(DebugEventsWriterTest,DisableCyclicBufferBehavior)842 TEST_F(DebugEventsWriterTest, DisableCyclicBufferBehavior) {
843 const size_t kCyclicBufferSize = 0; // A value <= 0 disables cyclic behavior.
844 DebugEventsWriter* writer = DebugEventsWriter::GetDebugEventsWriter(
845 dump_root_, tfdbg_run_id_, kCyclicBufferSize);
846 TF_ASSERT_OK(writer->Init());
847
848 const size_t kNumEvents = 20;
849
850 for (size_t i = 0; i < kNumEvents; ++i) {
851 Execution* execution = new Execution();
852 execution->set_op_type("Log");
853 execution->add_input_tensor_ids(i);
854 TF_ASSERT_OK(writer->WriteExecution(execution));
855 }
856 TF_ASSERT_OK(writer->FlushExecutionFiles());
857
858 std::vector<DebugEvent> actuals;
859 ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
860 EXPECT_EQ(actuals.size(), kNumEvents);
861 for (size_t i = 0; i < kNumEvents; ++i) {
862 EXPECT_EQ(actuals[i].execution().op_type(), "Log");
863 EXPECT_EQ(actuals[i].execution().input_tensor_ids().size(), 1);
864 EXPECT_EQ(actuals[i].execution().input_tensor_ids()[0], i);
865 }
866
867 for (size_t i = 0; i < kNumEvents; ++i) {
868 GraphExecutionTrace* trace = new GraphExecutionTrace();
869 trace->set_tfdbg_context_id(strings::Printf("graph_%.2ld", i));
870 TF_ASSERT_OK(writer->WriteGraphExecutionTrace(trace));
871 }
872 TF_ASSERT_OK(writer->FlushExecutionFiles());
873
874 ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
875 &actuals);
876 EXPECT_EQ(actuals.size(), kNumEvents);
877 for (size_t i = 0; i < kNumEvents; ++i) {
878 EXPECT_EQ(actuals[i].graph_execution_trace().tfdbg_context_id(),
879 strings::Printf("graph_%.2ld", i));
880 }
881
882 // Close the writer so the files can be safely deleted.
883 TF_ASSERT_OK(writer->Close());
884 }
885
886 } // namespace tfdbg
887 } // namespace tensorflow
888