1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <algorithm>
18 #include <cstddef>
19 #include <cstdint>
20 #include <cstdlib>
21 #include <cstring>
22 #include <limits>
23 #include <memory>
24 #include <utility>
25
26 #include "perfetto/base/compiler.h"
27 #include "perfetto/base/logging.h"
28 #include "perfetto/public/compiler.h"
29 #include "perfetto/trace_processor/trace_blob_view.h"
30 #include "src/trace_processor/importers/android_bugreport/android_dumpstate_event.h"
31 #include "src/trace_processor/importers/android_bugreport/android_log_event.h"
32 #include "src/trace_processor/importers/art_method/art_method_event.h"
33 #include "src/trace_processor/importers/common/parser_types.h"
34 #include "src/trace_processor/importers/common/trace_parser.h"
35 #include "src/trace_processor/importers/fuchsia/fuchsia_record.h"
36 #include "src/trace_processor/importers/gecko/gecko_event.h"
37 #include "src/trace_processor/importers/instruments/row.h"
38 #include "src/trace_processor/importers/perf/record.h"
39 #include "src/trace_processor/importers/perf_text/perf_text_event.h"
40 #include "src/trace_processor/sorter/trace_sorter.h"
41 #include "src/trace_processor/sorter/trace_token_buffer.h"
42 #include "src/trace_processor/storage/stats.h"
43 #include "src/trace_processor/types/trace_processor_context.h"
44 #include "src/trace_processor/util/bump_allocator.h"
45
46 namespace perfetto::trace_processor {
47
TraceSorter(TraceProcessorContext * context,SortingMode sorting_mode,EventHandling event_handling)48 TraceSorter::TraceSorter(TraceProcessorContext* context,
49 SortingMode sorting_mode,
50 EventHandling event_handling)
51 : sorting_mode_(sorting_mode),
52 storage_(context->storage),
53 event_handling_(event_handling) {
54 AddMachineContext(context);
55 }
56
~TraceSorter()57 TraceSorter::~TraceSorter() {
58 // If trace processor encountered a fatal error, it's possible for some events
59 // to have been pushed without evicting them by pushing to the next stage. Do
60 // that now.
61 for (auto& sorter_data : sorter_data_by_machine_) {
62 for (auto& queue : sorter_data.queues) {
63 for (const auto& event : queue.events_) {
64 ExtractAndDiscardTokenizedObject(event);
65 }
66 }
67 }
68 }
69
Sort(TraceTokenBuffer & buffer,bool use_slow_sorting)70 void TraceSorter::Queue::Sort(TraceTokenBuffer& buffer, bool use_slow_sorting) {
71 PERFETTO_DCHECK(needs_sorting());
72 PERFETTO_DCHECK(sort_start_idx_ < events_.size());
73
74 // If sort_min_ts_ has been set, it will no long be max_int, and so will be
75 // smaller than max_ts_.
76 PERFETTO_DCHECK(sort_min_ts_ < std::numeric_limits<int64_t>::max());
77
78 // We know that all events between [0, sort_start_idx_] are sorted. Within
79 // this range, perform a bound search and find the iterator for the min
80 // timestamp that broke the monotonicity. Re-sort from there to the end.
81 auto sort_end = events_.begin() + static_cast<ssize_t>(sort_start_idx_);
82 if (use_slow_sorting) {
83 PERFETTO_DCHECK(sort_min_ts_ <= max_ts_);
84 PERFETTO_DCHECK(std::is_sorted(events_.begin(), sort_end,
85 TimestampedEvent::SlowOperatorLess{buffer}));
86 } else {
87 PERFETTO_DCHECK(sort_min_ts_ < max_ts_);
88 PERFETTO_DCHECK(std::is_sorted(events_.begin(), sort_end));
89 }
90 auto sort_begin = std::lower_bound(events_.begin(), sort_end, sort_min_ts_,
91 &TimestampedEvent::Compare);
92 if (use_slow_sorting) {
93 std::sort(sort_begin, events_.end(),
94 TimestampedEvent::SlowOperatorLess{buffer});
95 } else {
96 std::sort(sort_begin, events_.end());
97 }
98 sort_start_idx_ = 0;
99 sort_min_ts_ = 0;
100
101 // At this point |events_| must be fully sorted
102 if (use_slow_sorting) {
103 PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end(),
104 TimestampedEvent::SlowOperatorLess{buffer}));
105 } else {
106 PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end()));
107 }
108 }
109
110 // Removes all the events in |queues_| that are earlier than the given
111 // packet index and moves them to the next parser stages, respecting global
112 // timestamp order. This function is a "extract min from N sorted queues", with
113 // some little cleverness: we know that events tend to be bursty, so events are
114 // not going to be randomly distributed on the N |queues_|.
115 // Upon each iteration this function finds the first two queues (if any) that
116 // have the oldest events, and extracts events from the 1st until hitting the
117 // min_ts of the 2nd. Imagine the queues are as follows:
118 //
119 // q0 {min_ts: 10 max_ts: 30}
120 // q1 {min_ts:5 max_ts: 35}
121 // q2 {min_ts: 12 max_ts: 40}
122 //
123 // We know that we can extract all events from q1 until we hit ts=10 without
124 // looking at any other queue. After hitting ts=10, we need to re-look to all of
125 // them to figure out the next min-event.
126 // There are more suitable data structures to do this (e.g. keeping a min-heap
127 // to avoid re-scanning all the queues all the times) but doesn't seem worth it.
128 // With Android traces (that have 8 CPUs) this function accounts for ~1-3% cpu
129 // time in a profiler.
SortAndExtractEventsUntilAllocId(BumpAllocator::AllocId limit_alloc_id)130 void TraceSorter::SortAndExtractEventsUntilAllocId(
131 BumpAllocator::AllocId limit_alloc_id) {
132 constexpr int64_t kTsMax = std::numeric_limits<int64_t>::max();
133 for (;;) {
134 size_t min_machine_idx = 0;
135 size_t min_queue_idx = 0; // The index of the queue with the min(ts).
136
137 // The top-2 min(ts) among all queues.
138 // queues_[min_queue_idx].events.timestamp == min_queue_ts[0].
139 int64_t min_queue_ts[2]{kTsMax, kTsMax};
140
141 // This loop identifies the queue which starts with the earliest event and
142 // also remembers the earliest event of the 2nd queue (in min_queue_ts[1]).
143 bool all_queues_empty = true;
144 for (size_t m = 0; m < sorter_data_by_machine_.size(); m++) {
145 TraceSorterData& sorter_data = sorter_data_by_machine_[m];
146 for (size_t i = 0; i < sorter_data.queues.size(); i++) {
147 auto& queue = sorter_data.queues[i];
148 if (queue.events_.empty())
149 continue;
150 PERFETTO_DCHECK(queue.max_ts_ <= append_max_ts_);
151
152 // Checking for |all_queues_empty| is necessary here as in fuzzer cases
153 // we can end up with |int64::max()| as the value here.
154 // See https://crbug.com/oss-fuzz/69164 for an example.
155 if (all_queues_empty || queue.min_ts_ < min_queue_ts[0]) {
156 min_queue_ts[1] = min_queue_ts[0];
157 min_queue_ts[0] = queue.min_ts_;
158 min_queue_idx = i;
159 min_machine_idx = m;
160 } else if (queue.min_ts_ < min_queue_ts[1]) {
161 min_queue_ts[1] = queue.min_ts_;
162 }
163 all_queues_empty = false;
164 }
165 }
166 if (all_queues_empty)
167 break;
168
169 auto& sorter_data = sorter_data_by_machine_[min_machine_idx];
170 auto& queue = sorter_data.queues[min_queue_idx];
171 auto& events = queue.events_;
172 if (queue.needs_sorting())
173 queue.Sort(token_buffer_, use_slow_sorting_);
174 PERFETTO_DCHECK(queue.min_ts_ == events.front().ts);
175
176 // Now that we identified the min-queue, extract all events from it until
177 // we hit either: (1) the min-ts of the 2nd queue or (2) the packet index
178 // limit, whichever comes first.
179 size_t num_extracted = 0;
180 for (auto& event : events) {
181 if (event.alloc_id() >= limit_alloc_id) {
182 break;
183 }
184
185 if (event.ts > min_queue_ts[1]) {
186 // We should never hit this condition on the first extraction as by
187 // the algorithm above (event.ts =) min_queue_ts[0] <= min_queue[1].
188 PERFETTO_DCHECK(num_extracted > 0);
189 break;
190 }
191
192 ++num_extracted;
193 MaybeExtractEvent(min_machine_idx, min_queue_idx, event);
194 } // for (event: events)
195
196 // The earliest event cannot be extracted without going past the limit.
197 if (!num_extracted)
198 break;
199
200 // Now remove the entries from the event buffer and update the queue-local
201 // and global time bounds.
202 events.erase_front(num_extracted);
203 events.shrink_to_fit();
204
205 // Since we likely just removed a bunch of items try to reduce the memory
206 // usage of the token buffer.
207 token_buffer_.FreeMemory();
208
209 // Update the queue timestamps to reflect the bounds after extraction.
210 if (events.empty()) {
211 queue.min_ts_ = kTsMax;
212 queue.max_ts_ = 0;
213 } else {
214 queue.min_ts_ = queue.events_.front().ts;
215 }
216 } // for(;;)
217 }
218
ParseTracePacket(TraceProcessorContext & context,const TimestampedEvent & event)219 void TraceSorter::ParseTracePacket(TraceProcessorContext& context,
220 const TimestampedEvent& event) {
221 TraceTokenBuffer::Id id = GetTokenBufferId(event);
222 switch (event.type()) {
223 case TimestampedEvent::Type::kPerfRecord:
224 context.perf_record_parser->ParsePerfRecord(
225 event.ts, token_buffer_.Extract<perf_importer::Record>(id));
226 return;
227 case TimestampedEvent::Type::kInstrumentsRow:
228 context.instruments_row_parser->ParseInstrumentsRow(
229 event.ts, token_buffer_.Extract<instruments_importer::Row>(id));
230 return;
231 case TimestampedEvent::Type::kTracePacket:
232 context.proto_trace_parser->ParseTracePacket(
233 event.ts, token_buffer_.Extract<TracePacketData>(id));
234 return;
235 case TimestampedEvent::Type::kTrackEvent:
236 context.proto_trace_parser->ParseTrackEvent(
237 event.ts, token_buffer_.Extract<TrackEventData>(id));
238 return;
239 case TimestampedEvent::Type::kFuchsiaRecord:
240 context.fuchsia_record_parser->ParseFuchsiaRecord(
241 event.ts, token_buffer_.Extract<FuchsiaRecord>(id));
242 return;
243 case TimestampedEvent::Type::kJsonValue:
244 context.json_trace_parser->ParseJsonPacket(
245 event.ts, std::move(token_buffer_.Extract<JsonEvent>(id).value));
246 return;
247 case TimestampedEvent::Type::kJsonValueWithDur:
248 context.json_trace_parser->ParseJsonPacket(
249 event.ts,
250 std::move(token_buffer_.Extract<JsonWithDurEvent>(id).value));
251 return;
252 case TimestampedEvent::Type::kSpeRecord:
253 context.spe_record_parser->ParseSpeRecord(
254 event.ts, token_buffer_.Extract<TraceBlobView>(id));
255 return;
256 case TimestampedEvent::Type::kSystraceLine:
257 context.json_trace_parser->ParseSystraceLine(
258 event.ts, token_buffer_.Extract<SystraceLine>(id));
259 return;
260 case TimestampedEvent::Type::kAndroidDumpstateEvent:
261 context.android_dumpstate_event_parser->ParseAndroidDumpstateEvent(
262 event.ts, token_buffer_.Extract<AndroidDumpstateEvent>(id));
263 return;
264 case TimestampedEvent::Type::kAndroidLogEvent:
265 context.android_log_event_parser->ParseAndroidLogEvent(
266 event.ts, token_buffer_.Extract<AndroidLogEvent>(id));
267 return;
268 case TimestampedEvent::Type::kLegacyV8CpuProfileEvent:
269 context.json_trace_parser->ParseLegacyV8ProfileEvent(
270 event.ts, token_buffer_.Extract<LegacyV8CpuProfileEvent>(id));
271 return;
272 case TimestampedEvent::Type::kGeckoEvent:
273 context.gecko_trace_parser->ParseGeckoEvent(
274 event.ts, token_buffer_.Extract<gecko_importer::GeckoEvent>(id));
275 return;
276 case TimestampedEvent::Type::kArtMethodEvent:
277 context.art_method_parser->ParseArtMethodEvent(
278 event.ts, token_buffer_.Extract<art_method::ArtMethodEvent>(id));
279 return;
280 case TimestampedEvent::Type::kPerfTextEvent:
281 context.perf_text_parser->ParsePerfTextEvent(
282 event.ts,
283 token_buffer_.Extract<perf_text_importer::PerfTextEvent>(id));
284 return;
285 case TimestampedEvent::Type::kInlineSchedSwitch:
286 case TimestampedEvent::Type::kInlineSchedWaking:
287 case TimestampedEvent::Type::kEtwEvent:
288 case TimestampedEvent::Type::kFtraceEvent:
289 PERFETTO_FATAL("Invalid event type");
290 }
291 PERFETTO_FATAL("For GCC");
292 }
293
ParseEtwPacket(TraceProcessorContext & context,uint32_t cpu,const TimestampedEvent & event)294 void TraceSorter::ParseEtwPacket(TraceProcessorContext& context,
295 uint32_t cpu,
296 const TimestampedEvent& event) {
297 TraceTokenBuffer::Id id = GetTokenBufferId(event);
298 switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
299 case TimestampedEvent::Type::kEtwEvent:
300 context.proto_trace_parser->ParseEtwEvent(
301 cpu, event.ts, token_buffer_.Extract<TracePacketData>(id));
302 return;
303 case TimestampedEvent::Type::kInlineSchedSwitch:
304 case TimestampedEvent::Type::kInlineSchedWaking:
305 case TimestampedEvent::Type::kFtraceEvent:
306 case TimestampedEvent::Type::kTrackEvent:
307 case TimestampedEvent::Type::kSpeRecord:
308 case TimestampedEvent::Type::kSystraceLine:
309 case TimestampedEvent::Type::kTracePacket:
310 case TimestampedEvent::Type::kPerfRecord:
311 case TimestampedEvent::Type::kInstrumentsRow:
312 case TimestampedEvent::Type::kJsonValue:
313 case TimestampedEvent::Type::kJsonValueWithDur:
314 case TimestampedEvent::Type::kFuchsiaRecord:
315 case TimestampedEvent::Type::kAndroidDumpstateEvent:
316 case TimestampedEvent::Type::kAndroidLogEvent:
317 case TimestampedEvent::Type::kLegacyV8CpuProfileEvent:
318 case TimestampedEvent::Type::kGeckoEvent:
319 case TimestampedEvent::Type::kArtMethodEvent:
320 case TimestampedEvent::Type::kPerfTextEvent:
321 PERFETTO_FATAL("Invalid event type");
322 }
323 PERFETTO_FATAL("For GCC");
324 }
325
ParseFtracePacket(TraceProcessorContext & context,uint32_t cpu,const TimestampedEvent & event)326 void TraceSorter::ParseFtracePacket(TraceProcessorContext& context,
327 uint32_t cpu,
328 const TimestampedEvent& event) {
329 TraceTokenBuffer::Id id = GetTokenBufferId(event);
330 switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
331 case TimestampedEvent::Type::kInlineSchedSwitch:
332 context.proto_trace_parser->ParseInlineSchedSwitch(
333 cpu, event.ts, token_buffer_.Extract<InlineSchedSwitch>(id));
334 return;
335 case TimestampedEvent::Type::kInlineSchedWaking:
336 context.proto_trace_parser->ParseInlineSchedWaking(
337 cpu, event.ts, token_buffer_.Extract<InlineSchedWaking>(id));
338 return;
339 case TimestampedEvent::Type::kFtraceEvent:
340 context.proto_trace_parser->ParseFtraceEvent(
341 cpu, event.ts, token_buffer_.Extract<TracePacketData>(id));
342 return;
343 case TimestampedEvent::Type::kEtwEvent:
344 case TimestampedEvent::Type::kTrackEvent:
345 case TimestampedEvent::Type::kSpeRecord:
346 case TimestampedEvent::Type::kSystraceLine:
347 case TimestampedEvent::Type::kTracePacket:
348 case TimestampedEvent::Type::kPerfRecord:
349 case TimestampedEvent::Type::kInstrumentsRow:
350 case TimestampedEvent::Type::kJsonValue:
351 case TimestampedEvent::Type::kJsonValueWithDur:
352 case TimestampedEvent::Type::kFuchsiaRecord:
353 case TimestampedEvent::Type::kAndroidDumpstateEvent:
354 case TimestampedEvent::Type::kAndroidLogEvent:
355 case TimestampedEvent::Type::kLegacyV8CpuProfileEvent:
356 case TimestampedEvent::Type::kGeckoEvent:
357 case TimestampedEvent::Type::kArtMethodEvent:
358 case TimestampedEvent::Type::kPerfTextEvent:
359 PERFETTO_FATAL("Invalid event type");
360 }
361 PERFETTO_FATAL("For GCC");
362 }
363
ExtractAndDiscardTokenizedObject(const TimestampedEvent & event)364 void TraceSorter::ExtractAndDiscardTokenizedObject(
365 const TimestampedEvent& event) {
366 TraceTokenBuffer::Id id = GetTokenBufferId(event);
367 switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
368 case TimestampedEvent::Type::kTracePacket:
369 case TimestampedEvent::Type::kFtraceEvent:
370 case TimestampedEvent::Type::kEtwEvent:
371 base::ignore_result(token_buffer_.Extract<TracePacketData>(id));
372 return;
373 case TimestampedEvent::Type::kTrackEvent:
374 base::ignore_result(token_buffer_.Extract<TrackEventData>(id));
375 return;
376 case TimestampedEvent::Type::kFuchsiaRecord:
377 base::ignore_result(token_buffer_.Extract<FuchsiaRecord>(id));
378 return;
379 case TimestampedEvent::Type::kJsonValue:
380 base::ignore_result(token_buffer_.Extract<JsonEvent>(id));
381 return;
382 case TimestampedEvent::Type::kJsonValueWithDur:
383 base::ignore_result(token_buffer_.Extract<JsonWithDurEvent>(id));
384 return;
385 case TimestampedEvent::Type::kSpeRecord:
386 base::ignore_result(token_buffer_.Extract<TraceBlobView>(id));
387 return;
388 case TimestampedEvent::Type::kSystraceLine:
389 base::ignore_result(token_buffer_.Extract<SystraceLine>(id));
390 return;
391 case TimestampedEvent::Type::kInlineSchedSwitch:
392 base::ignore_result(token_buffer_.Extract<InlineSchedSwitch>(id));
393 return;
394 case TimestampedEvent::Type::kInlineSchedWaking:
395 base::ignore_result(token_buffer_.Extract<InlineSchedWaking>(id));
396 return;
397 case TimestampedEvent::Type::kPerfRecord:
398 base::ignore_result(token_buffer_.Extract<perf_importer::Record>(id));
399 return;
400 case TimestampedEvent::Type::kInstrumentsRow:
401 base::ignore_result(token_buffer_.Extract<instruments_importer::Row>(id));
402 return;
403 case TimestampedEvent::Type::kAndroidDumpstateEvent:
404 base::ignore_result(token_buffer_.Extract<AndroidDumpstateEvent>(id));
405 return;
406 case TimestampedEvent::Type::kAndroidLogEvent:
407 base::ignore_result(token_buffer_.Extract<AndroidLogEvent>(id));
408 return;
409 case TimestampedEvent::Type::kLegacyV8CpuProfileEvent:
410 base::ignore_result(token_buffer_.Extract<LegacyV8CpuProfileEvent>(id));
411 return;
412 case TimestampedEvent::Type::kGeckoEvent:
413 base::ignore_result(
414 token_buffer_.Extract<gecko_importer::GeckoEvent>(id));
415 return;
416 case TimestampedEvent::Type::kArtMethodEvent:
417 base::ignore_result(
418 token_buffer_.Extract<art_method::ArtMethodEvent>(id));
419 return;
420 case TimestampedEvent::Type::kPerfTextEvent:
421 base::ignore_result(
422 token_buffer_.Extract<perf_text_importer::PerfTextEvent>(id));
423 return;
424 }
425 PERFETTO_FATAL("For GCC");
426 }
427
MaybeExtractEvent(size_t min_machine_idx,size_t queue_idx,const TimestampedEvent & event)428 void TraceSorter::MaybeExtractEvent(size_t min_machine_idx,
429 size_t queue_idx,
430 const TimestampedEvent& event) {
431 auto* machine_context =
432 sorter_data_by_machine_[min_machine_idx].machine_context;
433 int64_t timestamp = event.ts;
434 if (timestamp < latest_pushed_event_ts_)
435 storage_->IncrementStats(stats::sorter_push_event_out_of_order);
436
437 latest_pushed_event_ts_ = std::max(latest_pushed_event_ts_, timestamp);
438
439 if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kSortAndDrop)) {
440 // Parse* would extract this event and push it to the next stage. Since we
441 // are skipping that, just extract and discard it.
442 ExtractAndDiscardTokenizedObject(event);
443 return;
444 }
445 PERFETTO_DCHECK(event_handling_ == EventHandling::kSortAndPush);
446
447 if (queue_idx == 0) {
448 ParseTracePacket(*machine_context, event);
449 } else {
450 // Ftrace queues start at offset 1. So queues_[1] = cpu[0] and so on.
451 uint32_t cpu = static_cast<uint32_t>(queue_idx - 1);
452 auto event_type = static_cast<TimestampedEvent::Type>(event.event_type);
453
454 if (event_type == TimestampedEvent::Type::kEtwEvent) {
455 ParseEtwPacket(*machine_context, static_cast<uint32_t>(cpu), event);
456 } else {
457 ParseFtracePacket(*machine_context, cpu, event);
458 }
459 }
460 }
461
462 } // namespace perfetto::trace_processor
463