1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/core/profiler/backends/cpu/traceme_recorder.h"
16
17 #include <stddef.h>
18
19 #include <algorithm>
20 #include <atomic>
21 #include <memory>
22 #include <new>
23 #include <utility>
24 #include <vector>
25
26 #include "absl/container/flat_hash_map.h"
27 #include "tensorflow/core/platform/env.h"
28 #include "tensorflow/core/platform/logging.h"
29 #include "tensorflow/core/platform/macros.h"
30 #include "tensorflow/core/platform/mutex.h"
31 #include "tensorflow/core/platform/types.h"
32
33 namespace tensorflow {
34 namespace profiler {
35 namespace internal {
36
37 std::atomic<int> g_trace_level(TraceMeRecorder::kTracingDisabled);
38
39 // g_trace_level implementation must be lock-free for faster execution of the
40 // TraceMe API. This can be commented (if compilation is failing) but execution
41 // might be slow (even when tracing is disabled).
42 static_assert(ATOMIC_INT_LOCK_FREE == 2, "Assumed atomic<int> was lock free");
43
44 } // namespace internal
45
46 namespace {
47
48 // Track events created by ActivityStart and merge their data into events
49 // created by ActivityEnd. TraceMe records events in its destructor, so this
50 // results in complete events sorted by their end_time in the thread they ended.
51 // Within the same thread, the record created by ActivityStart must appear
52 // before the record created by ActivityEnd. Cross-thread events must be
53 // processed in a separate pass. A single map can be used because the
54 // activity_id is globally unique.
55 class SplitEventTracker {
56 public:
AddStart(TraceMeRecorder::Event && event)57 void AddStart(TraceMeRecorder::Event&& event) {
58 DCHECK(event.IsStart());
59 start_events_.emplace(event.ActivityId(), std::move(event));
60 }
61
AddEnd(TraceMeRecorder::Event * event)62 void AddEnd(TraceMeRecorder::Event* event) {
63 DCHECK(event->IsEnd());
64 if (!FindStartAndMerge(event)) {
65 end_events_.push_back(event);
66 }
67 }
68
HandleCrossThreadEvents()69 void HandleCrossThreadEvents() {
70 for (auto* event : end_events_) {
71 FindStartAndMerge(event);
72 }
73 }
74
75 private:
76 // Finds the start of the given event and merges data into it.
FindStartAndMerge(TraceMeRecorder::Event * event)77 bool FindStartAndMerge(TraceMeRecorder::Event* event) {
78 auto iter = start_events_.find(event->ActivityId());
79 if (iter == start_events_.end()) return false;
80 auto& start_event = iter->second;
81 event->name = std::move(start_event.name);
82 event->start_time = start_event.start_time;
83 start_events_.erase(iter);
84 return true;
85 }
86
87 // Start events are collected from each ThreadLocalRecorder::Consume() call.
88 // Their data is merged into end_events.
89 absl::flat_hash_map<int64_t, TraceMeRecorder::Event> start_events_;
90
91 // End events are stored in the output of TraceMeRecorder::Consume().
92 std::vector<TraceMeRecorder::Event*> end_events_;
93 };
94
95 // A single-producer single-consumer queue of Events.
96 //
97 // Implemented as a linked-list of blocks containing numbered slots, with start
98 // and end pointers:
99 //
100 // [ events........ | next-]--> [ events......... | next ]
101 // ^start_block_ ^start_ ^end_block_ ^end_
102 //
103 // start_ is the first occupied slot, end_ is the first unoccupied slot.
104 //
105 // Push writes at end_, and then advances it, allocating a block if needed.
106 // Consume takes ownership of events in the range [start_, end_).
107 // Clear removes events in the range [start_, end_).
108 // The end_ pointer is atomic so Push and Consume can be concurrent.
109 //
110 // Push and Consume are lock free and each might be called from at most one
111 // thread. Push is only called by the owner thread. Consume is only called by
112 // the tracing control thread.
113 //
114 // Thus, Consume might race with Push, so Consume only removes events that were
115 // in the queue when it was invoked. If Push is called while Consume is active,
116 // the new event remains in the queue. Thus, the tracing control thread should
117 // call Consume when tracing stops to remove events created during tracing, and
118 // Clear when tracing starts again to remove any remaining events.
119 class EventQueue {
120 public:
EventQueue()121 EventQueue()
122 : start_block_(new Block{/*start=*/0, /*next=*/nullptr}),
123 start_(start_block_->start),
124 end_block_(start_block_),
125 end_(start_) {}
126
127 // Memory should be deallocated and trace events destroyed on destruction.
128 // This doesn't require global lock as this discards all the stored trace
129 // events and we assume of destruction of this instance only after the last
130 // Push() has been called.
~EventQueue()131 ~EventQueue() {
132 Clear();
133 DCHECK(Empty());
134 delete end_block_;
135 }
136
137 // Add a new event to the back of the queue. Fast and lock-free.
Push(TraceMeRecorder::Event && event)138 void Push(TraceMeRecorder::Event&& event) {
139 size_t end = end_.load(std::memory_order_relaxed);
140 new (&end_block_->events[end++ - end_block_->start].event)
141 TraceMeRecorder::Event(std::move(event));
142 if (TF_PREDICT_FALSE(end - end_block_->start == Block::kNumSlots)) {
143 auto* new_block = new Block{end, nullptr};
144 end_block_->next = new_block;
145 end_block_ = new_block;
146 }
147 end_.store(end, std::memory_order_release); // Write index after contents.
148 }
149
150 // Removes all events from the queue.
Clear()151 void Clear() {
152 size_t end = end_.load(std::memory_order_acquire);
153 while (start_ != end) {
154 Pop();
155 }
156 }
157
158 // Retrieve and remove all events in the queue at the time of invocation.
159 // If Push is called while Consume is active, the new event will not be
160 // removed from the queue.
161 // Consume is only called from ThreadLocalRecorder::Clear, which in turn is
162 // only called while holding TraceMeRecorder::Mutex, so Consume has a single
163 // caller at a time.
Consume(SplitEventTracker * split_event_tracker)164 TF_MUST_USE_RESULT std::deque<TraceMeRecorder::Event> Consume(
165 SplitEventTracker* split_event_tracker) {
166 // Read index before contents.
167 size_t end = end_.load(std::memory_order_acquire);
168 std::deque<TraceMeRecorder::Event> result;
169 while (start_ != end) {
170 TraceMeRecorder::Event event = Pop();
171 // Copy data from start events to end events. TraceMe records events in
172 // its destructor, so this results in complete events sorted by their
173 // end_time in the thread they ended. Within the same thread, the start
174 // event must appear before the corresponding end event.
175 if (event.IsStart()) {
176 split_event_tracker->AddStart(std::move(event));
177 continue;
178 }
179 result.emplace_back(std::move(event));
180 if (result.back().IsEnd()) {
181 split_event_tracker->AddEnd(&result.back());
182 }
183 }
184 return result;
185 }
186
187 private:
188 // Returns true if the queue is empty at the time of invocation.
Empty() const189 bool Empty() const {
190 return (start_ == end_.load(std::memory_order_acquire));
191 }
192
193 // Remove one event off the front of the queue and return it.
194 // REQUIRES: The queue must not be empty.
Pop()195 TraceMeRecorder::Event Pop() {
196 DCHECK(!Empty());
197 // Move the next event into the output.
198 auto& event = start_block_->events[start_++ - start_block_->start].event;
199 TraceMeRecorder::Event out = std::move(event);
200 event.~Event(); // Events must be individually destroyed.
201 // If we reach the end of a block, we own it and should delete it.
202 // The next block is present: end always points to something.
203 if (TF_PREDICT_FALSE(start_ - start_block_->start == Block::kNumSlots)) {
204 auto* next_block = start_block_->next;
205 delete start_block_;
206 start_block_ = next_block;
207 DCHECK_EQ(start_, start_block_->start);
208 }
209 return out;
210 }
211
212 struct Block {
213 // The number of slots in a block is chosen so the block fits in 64 KiB.
214 static constexpr size_t kSize = 1 << 16;
215 static constexpr size_t kNumSlots =
216 (kSize - (sizeof(size_t) + sizeof(Block*))) /
217 sizeof(TraceMeRecorder::Event);
218
219 size_t start; // The number of the first slot.
220 Block* next;
221 // Defer construction of Event until the data is available.
222 // Must also destroy manually, as the block may not fill entirely.
223 union MaybeEvent {
MaybeEvent()224 MaybeEvent() {}
~MaybeEvent()225 ~MaybeEvent() {}
226 TraceMeRecorder::Event event;
227 } events[kNumSlots];
228 };
229
230 static_assert(sizeof(Block) <= Block::kSize, "");
231
232 // Head of list for reading. Only accessed by consumer thread.
233 Block* start_block_;
234 size_t start_;
235 // Tail of list for writing. Accessed by producer thread.
236 Block* end_block_;
237 std::atomic<size_t> end_; // Atomic: also read by consumer thread.
238 };
239
240 } // namespace
241
242 // To avoid unnecessary synchronization between threads, each thread has a
243 // ThreadLocalRecorder that independently records its events.
244 class TraceMeRecorder::ThreadLocalRecorder {
245 public:
246 // The recorder is created the first time TraceMeRecorder::Record() is called
247 // on a thread.
ThreadLocalRecorder()248 ThreadLocalRecorder() {
249 auto* env = Env::Default();
250 info_.tid = env->GetCurrentThreadId();
251 env->GetCurrentThreadName(&info_.name);
252 }
253
ThreadId() const254 uint32 ThreadId() const { return info_.tid; }
255
256 // IsActive is called from the control thread.
IsActive() const257 bool IsActive() const { return active_.load(std::memory_order_acquire); }
258 // SetInactive is called when the owner thread is destroyed.
SetInactive()259 void SetInactive() { active_.store(0, std::memory_order_release); }
260
261 // Record is only called from the owner thread.
Record(TraceMeRecorder::Event && event)262 void Record(TraceMeRecorder::Event&& event) { queue_.Push(std::move(event)); }
263
264 // Clear is called from the control thread when tracing starts to remove any
265 // elements added due to Record racing with Consume.
Clear()266 void Clear() { queue_.Clear(); }
267
268 // Consume is called from the control thread when tracing stops.
Consume(SplitEventTracker * split_event_tracker)269 TF_MUST_USE_RESULT TraceMeRecorder::ThreadEvents Consume(
270 SplitEventTracker* split_event_tracker) {
271 return {info_, queue_.Consume(split_event_tracker)};
272 }
273
274 private:
275 TraceMeRecorder::ThreadInfo info_;
276 EventQueue queue_;
277 std::atomic<int> active_{1}; // std::atomic<bool> is not always lock-free.
278 };
279
280 // An instance of this wrapper is allocated in thread_local storage.
281 // It creates the ThreadLocalRecorder and notifies TraceMeRecorder when the
282 // the first TraceMe on the thread is executed while tracing is active, or when
283 // the thread is destroyed.
284 class TraceMeRecorder::ThreadLocalRecorderWrapper {
285 public:
ThreadLocalRecorderWrapper()286 ThreadLocalRecorderWrapper()
287 : recorder_(std::make_shared<TraceMeRecorder::ThreadLocalRecorder>()) {
288 TraceMeRecorder::Get()->RegisterThread(recorder_->ThreadId(), recorder_);
289 }
290
Record(TraceMeRecorder::Event && event)291 void Record(TraceMeRecorder::Event&& event) {
292 recorder_->Record(std::move(event));
293 }
294
~ThreadLocalRecorderWrapper()295 ~ThreadLocalRecorderWrapper() {
296 recorder_->SetInactive();
297 TraceMeRecorder::Get()->UnregisterThread(recorder_->ThreadId());
298 }
299
300 private:
301 // Ownership of ThreadLocalRecorder is shared with TraceMeRecorder.
302 // If a thread is destroyed during tracing, its ThreadLocalRecorder is kept
303 // alive until the end of tracing.
304 std::shared_ptr<TraceMeRecorder::ThreadLocalRecorder> recorder_;
305 };
306
Get()307 /*static*/ TraceMeRecorder* TraceMeRecorder::Get() {
308 static TraceMeRecorder* singleton = new TraceMeRecorder;
309 return singleton;
310 }
311
RegisterThread(uint32 tid,std::shared_ptr<ThreadLocalRecorder> thread)312 void TraceMeRecorder::RegisterThread(
313 uint32 tid, std::shared_ptr<ThreadLocalRecorder> thread) {
314 mutex_lock lock(mutex_);
315 threads_.insert_or_assign(tid, std::move(thread));
316 }
317
UnregisterThread(uint32 tid)318 void TraceMeRecorder::UnregisterThread(uint32 tid) {
319 // If tracing is active, keep the ThreadLocalRecorder alive.
320 if (Active()) return;
321 // If tracing is inactive, destroy the ThreadLocalRecorder.
322 mutex_lock lock(mutex_);
323 threads_.erase(tid);
324 }
325
326 // This method is performance critical and should be kept fast. It is called
327 // when tracing starts. The mutex is held, so no threads can be
328 // registered/unregistered. This ensures only the control thread calls
329 // ThreadLocalRecorder::Clear().
Clear()330 void TraceMeRecorder::Clear() {
331 for (auto& id_and_recorder : threads_) {
332 auto& recorder = id_and_recorder.second;
333 recorder->Clear();
334 // We should not have an inactive ThreadLocalRecorder here. If a thread is
335 // destroyed while tracing is inactive, its ThreadLocalRecorder is removed
336 // in UnregisterThread.
337 DCHECK(recorder->IsActive());
338 }
339 }
340
341 // This method is performance critical and should be kept fast. It is called
342 // when tracing stops. The mutex is held, so no threads can be
343 // registered/unregistered. This ensures only the control thread calls
344 // ThreadLocalRecorder::Consume().
Consume()345 TraceMeRecorder::Events TraceMeRecorder::Consume() {
346 TraceMeRecorder::Events result;
347 result.reserve(threads_.size());
348 SplitEventTracker split_event_tracker;
349 for (auto iter = threads_.begin(); iter != threads_.end();) {
350 auto& recorder = iter->second;
351 TraceMeRecorder::ThreadEvents events =
352 recorder->Consume(&split_event_tracker);
353 if (!events.events.empty()) {
354 result.push_back(std::move(events));
355 }
356 // We can have an active thread here. If a thread is destroyed while tracing
357 // is active, its ThreadLocalRecorder is kept alive in UnregisterThread.
358 if (!recorder->IsActive()) {
359 threads_.erase(iter++);
360 } else {
361 ++iter;
362 }
363 }
364 split_event_tracker.HandleCrossThreadEvents();
365 return result;
366 }
367
StartRecording(int level)368 bool TraceMeRecorder::StartRecording(int level) {
369 level = std::max(0, level);
370 mutex_lock lock(mutex_);
371 // Change trace_level_ while holding mutex_.
372 int expected = kTracingDisabled;
373 bool started = internal::g_trace_level.compare_exchange_strong(
374 expected, level, std::memory_order_acq_rel);
375 if (started) {
376 // We may have old events in buffers because Record() raced with Stop().
377 Clear();
378 }
379 return started;
380 }
381
Record(Event && event)382 void TraceMeRecorder::Record(Event&& event) {
383 static thread_local ThreadLocalRecorderWrapper thread_local_recorder;
384 thread_local_recorder.Record(std::move(event));
385 }
386
StopRecording()387 TraceMeRecorder::Events TraceMeRecorder::StopRecording() {
388 TraceMeRecorder::Events events;
389 mutex_lock lock(mutex_);
390 // Change trace_level_ while holding mutex_.
391 if (internal::g_trace_level.exchange(
392 kTracingDisabled, std::memory_order_acq_rel) != kTracingDisabled) {
393 events = Consume();
394 }
395 return events;
396 }
397
NewActivityId()398 /*static*/ int64_t TraceMeRecorder::NewActivityId() {
399 // Activity IDs: To avoid contention over a counter, the top 32 bits identify
400 // the originating thread, the bottom 32 bits name the event within a thread.
401 // IDs may be reused after 4 billion events on one thread, or 2 billion
402 // threads.
403 static std::atomic<int32> thread_counter(1); // avoid kUntracedActivity
404 const thread_local static int32_t thread_id =
405 thread_counter.fetch_add(1, std::memory_order_relaxed);
406 thread_local static uint32 per_thread_activity_id = 0;
407 return static_cast<int64_t>(thread_id) << 32 | per_thread_activity_id++;
408 }
409
410 } // namespace profiler
411 } // namespace tensorflow
412