xref: /aosp_15_r20/external/perfetto/src/trace_processor/sorter/trace_sorter.h (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_
18 #define SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_
19 
20 #include <algorithm>
21 #include <cstddef>
22 #include <cstdint>
23 #include <limits>
24 #include <memory>
25 #include <optional>
26 #include <string>
27 #include <tuple>
28 #include <type_traits>
29 #include <utility>
30 #include <vector>
31 
32 #include "perfetto/base/logging.h"
33 #include "perfetto/ext/base/circular_queue.h"
34 #include "perfetto/public/compiler.h"
35 #include "perfetto/trace_processor/ref_counted.h"
36 #include "perfetto/trace_processor/trace_blob_view.h"
37 #include "src/trace_processor/importers/android_bugreport/android_dumpstate_event.h"
38 #include "src/trace_processor/importers/android_bugreport/android_log_event.h"
39 #include "src/trace_processor/importers/art_method/art_method_event.h"
40 #include "src/trace_processor/importers/common/parser_types.h"
41 #include "src/trace_processor/importers/common/trace_parser.h"
42 #include "src/trace_processor/importers/fuchsia/fuchsia_record.h"
43 #include "src/trace_processor/importers/gecko/gecko_event.h"
44 #include "src/trace_processor/importers/instruments/row.h"
45 #include "src/trace_processor/importers/perf/record.h"
46 #include "src/trace_processor/importers/perf_text/perf_text_event.h"
47 #include "src/trace_processor/importers/systrace/systrace_line.h"
48 #include "src/trace_processor/sorter/trace_token_buffer.h"
49 #include "src/trace_processor/storage/trace_storage.h"
50 #include "src/trace_processor/types/trace_processor_context.h"
51 #include "src/trace_processor/util/bump_allocator.h"
52 
53 namespace perfetto::trace_processor {
54 
55 // This class takes care of sorting events parsed from the trace stream in
56 // arbitrary order and pushing them to the next pipeline stages (parsing) in
57 // order. In order to support streaming use-cases, sorting happens within a
58 // window.
59 //
60 // Events are held in the TraceSorter staging area (events_) until either:
61 // 1. We can determine that it's safe to extract events by observing
62 //  TracingServiceEvent Flush and ReadBuffer events
63 // 2. The trace EOF is reached
64 //
65 // Incremental extraction
66 //
67 // Incremental extraction happens by using a combination of flush and read
68 // buffer events from the tracing service. Note that incremental extraction
69 // is only applicable for write_into_file traces; ring-buffer traces will
70 // be sorted fully in-memory implicitly because there is only a single read
71 // buffer call at the end.
72 //
73 // The algorithm for incremental extraction is explained in detail at
74 // go/trace-sorting-is-complicated.
75 //
76 // Sorting algorithm
77 //
78 // The sorting algorithm is designed around the assumption that:
79 // - Most events come from ftrace.
80 // - Ftrace events are sorted within each cpu most of the times.
81 //
82 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues
83 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if
84 // necessary) before proceeding with the global merge-sort-extract.
85 //
86 // When an event is pushed through, it is just appended to the end of one of
87 // the N queues. While appending, we keep track of the fact that the queue
88 // is still ordered or just lost ordering. When an out-of-order event is
89 // detected on a queue we keep track of: (1) the offset within the queue where
90 // the chaos begun, (2) the timestamp that broke the ordering.
91 //
92 // When we decide to extract events from the queues into the next stages of
93 // the trace processor, we re-sort the events in the queue. Rather than
94 // re-sorting everything all the times, we use the above knowledge to restrict
95 // sorting to the (hopefully smaller) tail of the |events_| staging area.
96 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is
97 // ordered, and the second partition [sort_start_idx_.. end] is not.
98 // We use a logarithmic bound search operation to figure out what is the index
99 // within the first partition where sorting should start, and sort all events
100 // from there to the end.
101 class TraceSorter {
102  public:
103   enum class SortingMode {
104     kDefault,
105     kFullSort,
106   };
107   enum class EventHandling {
108     // Indicates that events should be sorted and pushed to the parsing stage.
109     kSortAndPush,
110 
111     // Indicates that events should be sorted but then dropped before pushing
112     // to the parsing stage.
113     // Used for performance analysis of the sorter.
114     kSortAndDrop,
115 
116     // Indicates that the events should be dropped as soon as they enter the
117     // sorter.
118     // Used in cases where we only want to perform tokenization: dropping data
119     // when it hits the sorter is much cleaner than trying to handle this
120     // at every different tokenizer.
121     kDrop,
122   };
123 
124   TraceSorter(TraceProcessorContext*,
125               SortingMode,
126               EventHandling = EventHandling::kSortAndPush);
127 
128   ~TraceSorter();
129 
sorting_mode()130   SortingMode sorting_mode() const { return sorting_mode_; }
131 
AddMachineContext(TraceProcessorContext * context)132   void AddMachineContext(TraceProcessorContext* context) {
133     sorter_data_by_machine_.emplace_back(context);
134   }
135 
136   void PushAndroidDumpstateEvent(
137       int64_t timestamp,
138       const AndroidDumpstateEvent& event,
139       std::optional<MachineId> machine_id = std::nullopt) {
140     AppendNonFtraceEvent(timestamp,
141                          TimestampedEvent::Type::kAndroidDumpstateEvent, event,
142                          machine_id);
143   }
144 
145   void PushAndroidLogEvent(int64_t timestamp,
146                            const AndroidLogEvent& event,
147                            std::optional<MachineId> machine_id = std::nullopt) {
148     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kAndroidLogEvent,
149                          event, machine_id);
150   }
151 
152   void PushPerfRecord(int64_t timestamp,
153                       perf_importer::Record record,
154                       std::optional<MachineId> machine_id = std::nullopt) {
155     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kPerfRecord,
156                          std::move(record), machine_id);
157   }
158 
159   void PushSpeRecord(int64_t timestamp,
160                      TraceBlobView record,
161                      std::optional<MachineId> machine_id = std::nullopt) {
162     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kSpeRecord,
163                          std::move(record), machine_id);
164   }
165 
166   void PushInstrumentsRow(int64_t timestamp,
167                           const instruments_importer::Row& row,
168                           std::optional<MachineId> machine_id = std::nullopt) {
169     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kInstrumentsRow,
170                          row, machine_id);
171   }
172 
173   void PushTracePacket(int64_t timestamp,
174                        TracePacketData data,
175                        std::optional<MachineId> machine_id = std::nullopt) {
176     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTracePacket,
177                          std::move(data), machine_id);
178   }
179 
180   void PushTracePacket(int64_t timestamp,
181                        RefPtr<PacketSequenceStateGeneration> state,
182                        TraceBlobView tbv,
183                        std::optional<MachineId> machine_id = std::nullopt) {
184     PushTracePacket(timestamp,
185                     TracePacketData{std::move(tbv), std::move(state)},
186                     machine_id);
187   }
188 
189   void PushJsonValue(int64_t timestamp,
190                      std::string json_value,
191                      std::optional<int64_t> dur = std::nullopt) {
192     if (dur.has_value()) {
193       // We need to account for slices with duration by sorting them first: this
194       // requires us to use the slower comparator which takes this into account.
195       use_slow_sorting_ = true;
196       AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kJsonValueWithDur,
197                            JsonWithDurEvent{*dur, std::move(json_value)});
198       return;
199     }
200     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kJsonValue,
201                          JsonEvent{std::move(json_value)});
202   }
203 
PushFuchsiaRecord(int64_t timestamp,FuchsiaRecord fuchsia_record)204   void PushFuchsiaRecord(int64_t timestamp, FuchsiaRecord fuchsia_record) {
205     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kFuchsiaRecord,
206                          std::move(fuchsia_record));
207   }
208 
PushSystraceLine(SystraceLine systrace_line)209   void PushSystraceLine(SystraceLine systrace_line) {
210     AppendNonFtraceEvent(systrace_line.ts,
211                          TimestampedEvent::Type::kSystraceLine,
212                          std::move(systrace_line));
213   }
214 
215   void PushTrackEventPacket(
216       int64_t timestamp,
217       TrackEventData track_event,
218       std::optional<MachineId> machine_id = std::nullopt) {
219     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTrackEvent,
220                          std::move(track_event), machine_id);
221   }
222 
PushLegacyV8CpuProfileEvent(int64_t timestamp,uint64_t session_id,uint32_t pid,uint32_t tid,uint32_t callsite_id)223   void PushLegacyV8CpuProfileEvent(int64_t timestamp,
224                                    uint64_t session_id,
225                                    uint32_t pid,
226                                    uint32_t tid,
227                                    uint32_t callsite_id) {
228     AppendNonFtraceEvent(
229         timestamp, TimestampedEvent::Type::kLegacyV8CpuProfileEvent,
230         LegacyV8CpuProfileEvent{session_id, pid, tid, callsite_id});
231   }
232 
PushGeckoEvent(int64_t timestamp,const gecko_importer::GeckoEvent & event)233   void PushGeckoEvent(int64_t timestamp,
234                       const gecko_importer::GeckoEvent& event) {
235     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kGeckoEvent, event);
236   }
237 
PushArtMethodEvent(int64_t timestamp,const art_method::ArtMethodEvent & event)238   void PushArtMethodEvent(int64_t timestamp,
239                           const art_method::ArtMethodEvent& event) {
240     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kArtMethodEvent,
241                          event);
242   }
243 
PushPerfTextEvent(int64_t timestamp,const perf_text_importer::PerfTextEvent & event)244   void PushPerfTextEvent(int64_t timestamp,
245                          const perf_text_importer::PerfTextEvent& event) {
246     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kPerfTextEvent,
247                          event);
248   }
249 
250   void PushEtwEvent(uint32_t cpu,
251                     int64_t timestamp,
252                     TraceBlobView tbv,
253                     RefPtr<PacketSequenceStateGeneration> state,
254                     std::optional<MachineId> machine_id = std::nullopt) {
255     if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) {
256       return;
257     }
258     TraceTokenBuffer::Id id =
259         token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)});
260     auto* queue = GetQueue(cpu + 1, machine_id);
261     queue->Append(timestamp, TimestampedEvent::Type::kEtwEvent, id,
262                   use_slow_sorting_);
263     UpdateAppendMaxTs(queue);
264   }
265 
266   void PushFtraceEvent(uint32_t cpu,
267                        int64_t timestamp,
268                        TraceBlobView tbv,
269                        RefPtr<PacketSequenceStateGeneration> state,
270                        std::optional<MachineId> machine_id = std::nullopt) {
271     if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) {
272       return;
273     }
274     TraceTokenBuffer::Id id =
275         token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)});
276     auto* queue = GetQueue(cpu + 1, machine_id);
277     queue->Append(timestamp, TimestampedEvent::Type::kFtraceEvent, id,
278                   use_slow_sorting_);
279     UpdateAppendMaxTs(queue);
280   }
281 
282   void PushInlineFtraceEvent(
283       uint32_t cpu,
284       int64_t timestamp,
285       const InlineSchedSwitch& inline_sched_switch,
286       std::optional<MachineId> machine_id = std::nullopt) {
287     if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) {
288       return;
289     }
290     // TODO(rsavitski): if a trace has a mix of normal & "compact" events
291     // (being pushed through this function), the ftrace batches will no longer
292     // be fully sorted by timestamp. In such situations, we will have to sort
293     // at the end of the batch. We can do better as both sub-sequences are
294     // sorted however. Consider adding extra queues, or pushing them in a
295     // merge-sort fashion
296     // // instead.
297     TraceTokenBuffer::Id id = token_buffer_.Append(inline_sched_switch);
298     auto* queue = GetQueue(cpu + 1, machine_id);
299     queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedSwitch, id,
300                   use_slow_sorting_);
301     UpdateAppendMaxTs(queue);
302   }
303 
304   void PushInlineFtraceEvent(
305       uint32_t cpu,
306       int64_t timestamp,
307       const InlineSchedWaking& inline_sched_waking,
308       std::optional<MachineId> machine_id = std::nullopt) {
309     if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) {
310       return;
311     }
312     TraceTokenBuffer::Id id = token_buffer_.Append(inline_sched_waking);
313     auto* queue = GetQueue(cpu + 1, machine_id);
314     queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedWaking, id,
315                   use_slow_sorting_);
316     UpdateAppendMaxTs(queue);
317   }
318 
ExtractEventsForced()319   void ExtractEventsForced() {
320     BumpAllocator::AllocId end_id = token_buffer_.PastTheEndAllocId();
321     SortAndExtractEventsUntilAllocId(end_id);
322     for (auto& sorter_data : sorter_data_by_machine_) {
323       for (const auto& queue : sorter_data.queues) {
324         PERFETTO_CHECK(queue.events_.empty());
325       }
326       sorter_data.queues.clear();
327     }
328 
329     alloc_id_for_extraction_ = end_id;
330     flushes_since_extraction_ = 0;
331   }
332 
NotifyFlushEvent()333   void NotifyFlushEvent() { flushes_since_extraction_++; }
334 
NotifyReadBufferEvent()335   void NotifyReadBufferEvent() {
336     if (sorting_mode_ == SortingMode::kFullSort ||
337         flushes_since_extraction_ < 2) {
338       return;
339     }
340 
341     SortAndExtractEventsUntilAllocId(alloc_id_for_extraction_);
342     alloc_id_for_extraction_ = token_buffer_.PastTheEndAllocId();
343     flushes_since_extraction_ = 0;
344   }
345 
max_timestamp()346   int64_t max_timestamp() const { return append_max_ts_; }
347 
348  private:
349   struct TimestampedEvent {
350     enum class Type : uint8_t {
351       kAndroidDumpstateEvent,
352       kAndroidLogEvent,
353       kEtwEvent,
354       kFtraceEvent,
355       kFuchsiaRecord,
356       kInlineSchedSwitch,
357       kInlineSchedWaking,
358       kInstrumentsRow,
359       kJsonValue,
360       kJsonValueWithDur,
361       kLegacyV8CpuProfileEvent,
362       kPerfRecord,
363       kSpeRecord,
364       kSystraceLine,
365       kTracePacket,
366       kTrackEvent,
367       kGeckoEvent,
368       kArtMethodEvent,
369       kPerfTextEvent,
370       kMax = kPerfTextEvent,
371     };
372 
373     // Number of bits required to store the max element in |Type|.
374     static constexpr uint32_t kMaxTypeBits = 6;
375     static_assert(static_cast<uint8_t>(Type::kMax) <= (1 << kMaxTypeBits),
376                   "Max type does not fit inside storage");
377 
378     // The timestamp of this event.
379     int64_t ts;
380 
381     // The fields inside BumpAllocator::AllocId of this tokenized object
382     // corresponding to this event.
383     uint64_t chunk_index : BumpAllocator::kChunkIndexAllocIdBits;
384     uint64_t chunk_offset : BumpAllocator::kChunkOffsetAllocIdBits;
385 
386     // The type of this event. GCC7 does not like bit-field enums (see
387     // https://stackoverflow.com/questions/36005063/gcc-suppress-warning-too-small-to-hold-all-values-of)
388     // so use an uint64_t instead and cast to the enum type.
389     uint64_t event_type : kMaxTypeBits;
390 
alloc_idTimestampedEvent391     BumpAllocator::AllocId alloc_id() const {
392       return BumpAllocator::AllocId{chunk_index, chunk_offset};
393     }
394 
typeTimestampedEvent395     Type type() const { return static_cast<Type>(event_type); }
396 
397     // For std::lower_bound().
CompareTimestampedEvent398     static bool Compare(const TimestampedEvent& x, int64_t ts) {
399       return x.ts < ts;
400     }
401 
402     // For std::sort().
403     bool operator<(const TimestampedEvent& evt) const {
404       return std::tie(ts, chunk_index, chunk_offset) <
405              std::tie(evt.ts, evt.chunk_index, evt.chunk_offset);
406     }
407 
408     struct SlowOperatorLess {
409       // For std::sort() in slow mode.
operatorTimestampedEvent::SlowOperatorLess410       bool operator()(const TimestampedEvent& a,
411                       const TimestampedEvent& b) const {
412         int64_t a_key =
413             a.type() == Type::kJsonValueWithDur
414                 ? std::numeric_limits<int64_t>::max() -
415                       buffer.Get<JsonWithDurEvent>(GetTokenBufferId(a))->dur
416                 : std::numeric_limits<int64_t>::max();
417         int64_t b_key =
418             b.type() == Type::kJsonValueWithDur
419                 ? std::numeric_limits<int64_t>::max() -
420                       buffer.Get<JsonWithDurEvent>(GetTokenBufferId(b))->dur
421                 : std::numeric_limits<int64_t>::max();
422         return std::tie(a.ts, a_key, a.chunk_index, a.chunk_offset) <
423                std::tie(b.ts, b_key, b.chunk_index, b.chunk_offset);
424       }
425       TraceTokenBuffer& buffer;
426     };
427   };
428 
429   static_assert(sizeof(TimestampedEvent) == 16,
430                 "TimestampedEvent must be equal to 16 bytes");
431   static_assert(std::is_trivially_copyable_v<TimestampedEvent>,
432                 "TimestampedEvent must be trivially copyable");
433   static_assert(std::is_trivially_move_assignable_v<TimestampedEvent>,
434                 "TimestampedEvent must be trivially move assignable");
435   static_assert(std::is_trivially_move_constructible_v<TimestampedEvent>,
436                 "TimestampedEvent must be trivially move constructible");
437   static_assert(std::is_nothrow_swappable_v<TimestampedEvent>,
438                 "TimestampedEvent must be trivially swappable");
439 
440   struct Queue {
AppendQueue441     void Append(int64_t ts,
442                 TimestampedEvent::Type type,
443                 TraceTokenBuffer::Id id,
444                 bool use_slow_sorting) {
445       {
446         TimestampedEvent event;
447         event.ts = ts;
448         event.chunk_index = id.alloc_id.chunk_index;
449         event.chunk_offset = id.alloc_id.chunk_offset;
450         event.event_type = static_cast<uint8_t>(type);
451         events_.emplace_back(std::move(event));
452       }
453 
454       // Events are often seen in order.
455       if (PERFETTO_LIKELY(ts > max_ts_ ||
456                           (!use_slow_sorting && ts == max_ts_))) {
457         max_ts_ = ts;
458       } else {
459         // The event is breaking ordering. The first time it happens, keep
460         // track of which index we are at. We know that everything before that
461         // is sorted (because events were pushed monotonically). Everything
462         // after that index, instead, will need a sorting pass before moving
463         // events to the next pipeline stage.
464         if (sort_start_idx_ == 0) {
465           PERFETTO_DCHECK(events_.size() >= 2);
466           sort_start_idx_ = events_.size() - 1;
467           sort_min_ts_ = ts;
468         } else {
469           sort_min_ts_ = std::min(sort_min_ts_, ts);
470         }
471       }
472 
473       min_ts_ = std::min(min_ts_, ts);
474       PERFETTO_DCHECK(min_ts_ <= max_ts_);
475     }
476 
needs_sortingQueue477     bool needs_sorting() const { return sort_start_idx_ != 0; }
478     void Sort(TraceTokenBuffer&, bool use_slow_sorting);
479 
480     base::CircularQueue<TimestampedEvent> events_;
481     int64_t min_ts_ = std::numeric_limits<int64_t>::max();
482     int64_t max_ts_ = 0;
483     size_t sort_start_idx_ = 0;
484     int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max();
485   };
486 
487   void SortAndExtractEventsUntilAllocId(BumpAllocator::AllocId alloc_id);
488 
489   Queue* GetQueue(size_t index,
490                   std::optional<MachineId> machine_id = std::nullopt) {
491     // sorter_data_by_machine_[0] corresponds to the default machine.
492     PERFETTO_DCHECK(sorter_data_by_machine_[0].machine_id == std::nullopt);
493     auto* queues = &sorter_data_by_machine_[0].queues;
494 
495     // Find the TraceSorterData instance when |machine_id| is not nullopt.
496     if (PERFETTO_UNLIKELY(machine_id.has_value())) {
497       auto it = std::find_if(sorter_data_by_machine_.begin() + 1,
498                              sorter_data_by_machine_.end(),
499                              [machine_id](const TraceSorterData& item) {
500                                return item.machine_id == machine_id;
501                              });
502       PERFETTO_DCHECK(it != sorter_data_by_machine_.end());
503       queues = &it->queues;
504     }
505 
506     if (PERFETTO_UNLIKELY(index >= queues->size()))
507       queues->resize(index + 1);
508     return &queues->at(index);
509   }
510 
511   template <typename E>
512   void AppendNonFtraceEvent(
513       int64_t ts,
514       TimestampedEvent::Type event_type,
515       E&& evt,
516       std::optional<MachineId> machine_id = std::nullopt) {
517     if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) {
518       return;
519     }
520     TraceTokenBuffer::Id id = token_buffer_.Append(std::forward<E>(evt));
521     AppendNonFtraceEventWithId(ts, event_type, id, machine_id);
522   }
523 
AppendNonFtraceEventWithId(int64_t ts,TimestampedEvent::Type event_type,TraceTokenBuffer::Id id,std::optional<MachineId> machine_id)524   void AppendNonFtraceEventWithId(int64_t ts,
525                                   TimestampedEvent::Type event_type,
526                                   TraceTokenBuffer::Id id,
527                                   std::optional<MachineId> machine_id) {
528     Queue* queue = GetQueue(0, machine_id);
529     queue->Append(ts, event_type, id, use_slow_sorting_);
530     UpdateAppendMaxTs(queue);
531   }
532 
UpdateAppendMaxTs(Queue * queue)533   void UpdateAppendMaxTs(Queue* queue) {
534     append_max_ts_ = std::max(append_max_ts_, queue->max_ts_);
535   }
536 
537   void ParseTracePacket(TraceProcessorContext& context,
538                         const TimestampedEvent&);
539   void ParseFtracePacket(TraceProcessorContext& context,
540                          uint32_t cpu,
541                          const TimestampedEvent&);
542   void ParseEtwPacket(TraceProcessorContext& context,
543                       uint32_t cpu,
544                       const TimestampedEvent&);
545 
546   void MaybeExtractEvent(size_t machine_idx,
547                          size_t queue_idx,
548                          const TimestampedEvent&);
549   void ExtractAndDiscardTokenizedObject(const TimestampedEvent& event);
550 
GetTokenBufferId(const TimestampedEvent & event)551   static TraceTokenBuffer::Id GetTokenBufferId(const TimestampedEvent& event) {
552     return TraceTokenBuffer::Id{event.alloc_id()};
553   }
554 
555   struct TraceSorterData {
TraceSorterDataTraceSorterData556     explicit TraceSorterData(TraceProcessorContext* _machine_context)
557         : machine_id(_machine_context->machine_id()),
558           machine_context(_machine_context) {}
559     std::optional<MachineId> machine_id;
560     // queues_[0] is the general (non-ftrace) queue.
561     // queues_[1] is the ftrace queue for CPU(0).
562     // queues_[x] is the ftrace queue for CPU(x - 1).
563     TraceProcessorContext* machine_context;
564     std::vector<Queue> queues;
565   };
566   std::vector<TraceSorterData> sorter_data_by_machine_;
567 
568   // Whether we should ignore incremental extraction and just wait for
569   // forced extractionn at the end of the trace.
570   SortingMode sorting_mode_ = SortingMode::kDefault;
571 
572   std::shared_ptr<TraceStorage> storage_;
573 
574   // Buffer for storing tokenized objects while the corresponding events are
575   // being sorted.
576   TraceTokenBuffer token_buffer_;
577 
578   // The AllocId until which events should be extracted. Set based
579   // on the AllocId in |OnReadBuffer|.
580   BumpAllocator::AllocId alloc_id_for_extraction_ =
581       token_buffer_.PastTheEndAllocId();
582 
583   // The number of flushes which have happened since the last incremental
584   // extraction.
585   uint32_t flushes_since_extraction_ = 0;
586 
587   // max(e.ts for e appended to the sorter)
588   int64_t append_max_ts_ = 0;
589 
590   // How events pushed into the sorter should be handled.
591   EventHandling event_handling_ = EventHandling::kSortAndPush;
592 
593   // max(e.ts for e pushed to next stage)
594   int64_t latest_pushed_event_ts_ = std::numeric_limits<int64_t>::min();
595 
596   // Whether when std::sorting the queues, we should use the slow
597   // sorting algorithm
598   bool use_slow_sorting_ = false;
599 };
600 
601 }  // namespace perfetto::trace_processor
602 
603 #endif  // SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_
604