xref: /aosp_15_r20/external/perfetto/src/trace_processor/sorter/trace_sorter_unittest.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "src/trace_processor/sorter/trace_sorter.h"
17 
18 #include <map>
19 #include <random>
20 #include <vector>
21 
22 #include "perfetto/trace_processor/basic_types.h"
23 #include "perfetto/trace_processor/trace_blob.h"
24 #include "perfetto/trace_processor/trace_blob_view.h"
25 #include "src/trace_processor/importers/common/parser_types.h"
26 #include "src/trace_processor/importers/proto/packet_sequence_state_generation.h"
27 #include "src/trace_processor/importers/proto/proto_trace_parser_impl.h"
28 #include "src/trace_processor/types/trace_processor_context.h"
29 #include "test/gtest_and_gmock.h"
30 
31 namespace perfetto {
32 namespace trace_processor {
33 namespace {
34 
35 using ::testing::_;
36 using ::testing::InSequence;
37 using ::testing::Invoke;
38 using ::testing::MockFunction;
39 using ::testing::NiceMock;
40 
41 constexpr std::optional<MachineId> kNullMachineId = std::nullopt;
42 
43 class MockTraceParser : public ProtoTraceParserImpl {
44  public:
MockTraceParser(TraceProcessorContext * context)45   explicit MockTraceParser(TraceProcessorContext* context)
46       : ProtoTraceParserImpl(context), machine_id_(context->machine_id()) {}
47 
48   MOCK_METHOD(void,
49               MOCK_ParseFtracePacket,
50               (uint32_t cpu,
51                int64_t timestamp,
52                const uint8_t* data,
53                size_t length,
54                std::optional<MachineId>));
55 
ParseFtraceEvent(uint32_t cpu,int64_t timestamp,TracePacketData data)56   void ParseFtraceEvent(uint32_t cpu,
57                         int64_t timestamp,
58                         TracePacketData data) override {
59     MOCK_ParseFtracePacket(cpu, timestamp, data.packet.data(),
60                            data.packet.length(), machine_id_);
61   }
62 
63   MOCK_METHOD(void,
64               MOCK_ParseTracePacket,
65               (int64_t ts, const uint8_t* data, size_t length));
66 
ParseTrackEvent(int64_t,TrackEventData)67   void ParseTrackEvent(int64_t, TrackEventData) override {}
68 
ParseTracePacket(int64_t ts,TracePacketData data)69   void ParseTracePacket(int64_t ts, TracePacketData data) override {
70     TraceBlobView& tbv = data.packet;
71     MOCK_ParseTracePacket(ts, tbv.data(), tbv.length());
72   }
73 
74   std::optional<MachineId> machine_id_;
75 };
76 
77 class MockTraceStorage : public TraceStorage {
78  public:
MockTraceStorage()79   MockTraceStorage() : TraceStorage() {}
80 
81   MOCK_METHOD(StringId, InternString, (base::StringView view), (override));
82 };
83 
84 class TraceSorterTest : public ::testing::Test {
85  public:
TraceSorterTest()86   TraceSorterTest() : test_buffer_(TraceBlob::Allocate(8)) {
87     storage_ = new NiceMock<MockTraceStorage>();
88     context_.storage.reset(storage_);
89     CreateSorter();
90   }
91 
CreateSorter(bool full_sort=true)92   void CreateSorter(bool full_sort = true) {
93     parser_ = new MockTraceParser(&context_);
94     context_.proto_trace_parser.reset(parser_);
95     auto sorting_mode = full_sort ? TraceSorter::SortingMode::kFullSort
96                                   : TraceSorter::SortingMode::kDefault;
97     context_.sorter.reset(new TraceSorter(&context_, sorting_mode));
98   }
99 
100  protected:
101   TraceProcessorContext context_;
102   MockTraceParser* parser_;
103   NiceMock<MockTraceStorage>* storage_;
104   TraceBlobView test_buffer_;
105 };
106 
TEST_F(TraceSorterTest,TestFtrace)107 TEST_F(TraceSorterTest, TestFtrace) {
108   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
109   TraceBlobView view = test_buffer_.slice_off(0, 1);
110   EXPECT_CALL(*parser_,
111               MOCK_ParseFtracePacket(0, 1000, view.data(), 1, kNullMachineId));
112   context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
113                                    std::move(view), state);
114   context_.sorter->ExtractEventsForced();
115 }
116 
TEST_F(TraceSorterTest,TestTracePacket)117 TEST_F(TraceSorterTest, TestTracePacket) {
118   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
119   TraceBlobView view = test_buffer_.slice_off(0, 1);
120   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1000, view.data(), 1));
121   context_.sorter->PushTracePacket(1000, state, std::move(view));
122   context_.sorter->ExtractEventsForced();
123 }
124 
TEST_F(TraceSorterTest,Ordering)125 TEST_F(TraceSorterTest, Ordering) {
126   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
127   TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
128   TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
129   TraceBlobView view_3 = test_buffer_.slice_off(0, 3);
130   TraceBlobView view_4 = test_buffer_.slice_off(0, 4);
131 
132   InSequence s;
133 
134   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(0, 1000, view_1.data(), 1,
135                                                kNullMachineId));
136   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1001, view_2.data(), 2));
137   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, view_3.data(), 3));
138   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(2, 1200, view_4.data(), 4,
139                                                kNullMachineId));
140 
141   context_.sorter->PushFtraceEvent(2 /*cpu*/, 1200 /*timestamp*/,
142                                    std::move(view_4), state);
143   context_.sorter->PushTracePacket(1001, state, std::move(view_2));
144   context_.sorter->PushTracePacket(1100, state, std::move(view_3));
145   context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
146                                    std::move(view_1), state);
147   context_.sorter->ExtractEventsForced();
148 }
149 
TEST_F(TraceSorterTest,IncrementalExtraction)150 TEST_F(TraceSorterTest, IncrementalExtraction) {
151   CreateSorter(false);
152 
153   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
154 
155   TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
156   TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
157   TraceBlobView view_3 = test_buffer_.slice_off(0, 3);
158   TraceBlobView view_4 = test_buffer_.slice_off(0, 4);
159   TraceBlobView view_5 = test_buffer_.slice_off(0, 5);
160 
161   // Flush at the start of packet sequence to match behavior of the
162   // service.
163   context_.sorter->NotifyFlushEvent();
164   context_.sorter->PushTracePacket(1200, state, std::move(view_2));
165   context_.sorter->PushTracePacket(1100, state, std::move(view_1));
166 
167   // No data should be exttracted at this point because we haven't
168   // seen two flushes yet.
169   context_.sorter->NotifyReadBufferEvent();
170 
171   // Now that we've seen two flushes, we should be ready to start extracting
172   // data on the next OnReadBufer call (after two flushes as usual).
173   context_.sorter->NotifyFlushEvent();
174   context_.sorter->NotifyReadBufferEvent();
175 
176   context_.sorter->NotifyFlushEvent();
177   context_.sorter->NotifyFlushEvent();
178   context_.sorter->PushTracePacket(1400, state, std::move(view_4));
179   context_.sorter->PushTracePacket(1300, state, std::move(view_3));
180 
181   // This ReadBuffer call should finally extract until the first OnReadBuffer
182   // call.
183   {
184     InSequence s;
185     EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
186     EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
187   }
188   context_.sorter->NotifyReadBufferEvent();
189 
190   context_.sorter->NotifyFlushEvent();
191   context_.sorter->PushTracePacket(1500, state, std::move(view_5));
192 
193   // Nothing should be extracted as we haven't seen the second flush.
194   context_.sorter->NotifyReadBufferEvent();
195 
196   // Now we've seen the second flush we should extract the next two packets.
197   context_.sorter->NotifyFlushEvent();
198   {
199     InSequence s;
200     EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1300, test_buffer_.data(), 3));
201     EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1400, test_buffer_.data(), 4));
202   }
203   context_.sorter->NotifyReadBufferEvent();
204 
205   // The forced extraction should get the last packet.
206   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1500, test_buffer_.data(), 5));
207   context_.sorter->ExtractEventsForced();
208 }
209 
210 // Simulate a producer bug where the third packet is emitted
211 // out of order. Verify that we track the stats correctly.
TEST_F(TraceSorterTest,OutOfOrder)212 TEST_F(TraceSorterTest, OutOfOrder) {
213   CreateSorter(false);
214 
215   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
216 
217   TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
218   TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
219   TraceBlobView view_3 = test_buffer_.slice_off(0, 3);
220   TraceBlobView view_4 = test_buffer_.slice_off(0, 4);
221 
222   context_.sorter->NotifyFlushEvent();
223   context_.sorter->NotifyFlushEvent();
224   context_.sorter->PushTracePacket(1200, state, std::move(view_2));
225   context_.sorter->PushTracePacket(1100, state, std::move(view_1));
226   context_.sorter->NotifyReadBufferEvent();
227 
228   // Both of the packets should have been pushed through.
229   context_.sorter->NotifyFlushEvent();
230   context_.sorter->NotifyFlushEvent();
231   {
232     InSequence s;
233     EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
234     EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
235   }
236   context_.sorter->NotifyReadBufferEvent();
237 
238   // Now, pass the third packet out of order.
239   context_.sorter->NotifyFlushEvent();
240   context_.sorter->NotifyFlushEvent();
241   context_.sorter->PushTracePacket(1150, state, std::move(view_3));
242   context_.sorter->NotifyReadBufferEvent();
243 
244   // The third packet should still be pushed through.
245   context_.sorter->NotifyFlushEvent();
246   context_.sorter->NotifyFlushEvent();
247   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1150, test_buffer_.data(), 3));
248   context_.sorter->NotifyReadBufferEvent();
249 
250   // But we should also increment the stat that this was out of order.
251   ASSERT_EQ(
252       context_.storage->stats()[stats::sorter_push_event_out_of_order].value,
253       1);
254 
255   // Push the fourth packet also out of order but after third.
256   context_.sorter->NotifyFlushEvent();
257   context_.sorter->NotifyFlushEvent();
258   context_.sorter->PushTracePacket(1170, state, std::move(view_4));
259   context_.sorter->NotifyReadBufferEvent();
260 
261   // The fourt packet should still be pushed through.
262   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1170, test_buffer_.data(), 4));
263   context_.sorter->ExtractEventsForced();
264 
265   // But we should also increment the stat that this was out of order.
266   ASSERT_EQ(
267       context_.storage->stats()[stats::sorter_push_event_out_of_order].value,
268       2);
269 }
270 
271 // Simulates a random stream of ftrace events happening on random CPUs.
272 // Tests that the output of the TraceSorter matches the timestamp order
273 // (% events happening at the same time on different CPUs).
TEST_F(TraceSorterTest,MultiQueueSorting)274 TEST_F(TraceSorterTest, MultiQueueSorting) {
275   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
276   std::minstd_rand0 rnd_engine(0);
277   std::map<int64_t /*ts*/, std::vector<uint32_t /*cpu*/>> expectations;
278 
279   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(_, _, _, _, _))
280       .WillRepeatedly(Invoke([&expectations](uint32_t cpu, int64_t timestamp,
281                                              const uint8_t*, size_t,
282                                              std::optional<MachineId>) {
283         EXPECT_EQ(expectations.begin()->first, timestamp);
284         auto& cpus = expectations.begin()->second;
285         bool cpu_found = false;
286         for (auto it = cpus.begin(); it < cpus.end(); it++) {
287           if (*it != cpu)
288             continue;
289           cpu_found = true;
290           cpus.erase(it);
291           break;
292         }
293         if (cpus.empty())
294           expectations.erase(expectations.begin());
295         EXPECT_TRUE(cpu_found);
296       }));
297 
298   // Allocate a 1000 byte trace blob and push one byte chunks to be sorted with
299   // random timestamps. This will stress test the sorter with worst case
300   // scenarios and will (and has many times) expose any subtle bugs hiding in
301   // the sorter logic.
302   TraceBlobView tbv(TraceBlob::Allocate(1000));
303   for (uint16_t i = 0; i < 1000; i++) {
304     int64_t ts = abs(static_cast<int64_t>(rnd_engine()));
305     uint8_t num_cpus = rnd_engine() % 3;
306     for (uint8_t j = 0; j < num_cpus; j++) {
307       uint32_t cpu = static_cast<uint32_t>(rnd_engine() % 32);
308       expectations[ts].push_back(cpu);
309       context_.sorter->PushFtraceEvent(cpu, ts, tbv.slice_off(i, 1), state);
310     }
311   }
312 
313   context_.sorter->ExtractEventsForced();
314   EXPECT_TRUE(expectations.empty());
315 }
316 
317 // An generalized version of MultiQueueSorting with multiple machines.
TEST_F(TraceSorterTest,MultiMachineSorting)318 TEST_F(TraceSorterTest, MultiMachineSorting) {
319   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
320   std::minstd_rand0 rnd_engine(0);
321 
322   struct ExpectedMachineAndCpu {
323     std::optional<MachineId> machine_id;
324     uint32_t cpu;
325 
326     bool operator==(const ExpectedMachineAndCpu& other) const {
327       return std::tie(machine_id, cpu) == std::tie(other.machine_id, other.cpu);
328     }
329     bool operator!=(const ExpectedMachineAndCpu& other) const {
330       return !operator==(other);
331     }
332   };
333   std::map<int64_t /*ts*/, std::vector<ExpectedMachineAndCpu>> expectations;
334 
335   // The total number of machines (including the default one).
336   constexpr size_t num_machines = 5;
337   std::vector<MockTraceParser*> extra_parsers;
338   std::vector<std::unique_ptr<TraceProcessorContext>> extra_contexts;
339   // Set up extra machines and add to the sorter.
340   // MachineIdValue are 1..(num_machines-1).
341   for (auto i = 1u; i < num_machines; i++) {
342     TraceProcessorContext::InitArgs args{context_.config, context_.storage, i};
343     auto ctx = std::make_unique<TraceProcessorContext>(args);
344     auto parser = std::make_unique<MockTraceParser>(ctx.get());
345     extra_parsers.push_back(parser.get());
346     ctx->proto_trace_parser = std::move(parser);
347     extra_contexts.push_back(std::move(ctx));
348     context_.sorter->AddMachineContext(extra_contexts.back().get());
349   }
350 
351   // Set up the expectation for the default machine.
352   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(_, _, _, _, _))
353       .WillRepeatedly(Invoke([&expectations](uint32_t cpu, int64_t timestamp,
354                                              const uint8_t*, size_t,
355                                              std::optional<MachineId>) {
356         EXPECT_EQ(expectations.begin()->first, timestamp);
357         auto& machines_and_cpus = expectations.begin()->second;
358         bool found = false;
359         for (auto it = machines_and_cpus.begin(); it < machines_and_cpus.end();
360              it++) {
361           // The default machine is called machine ID == std::nullopt.
362           if (*it != ExpectedMachineAndCpu{kNullMachineId, cpu})
363             continue;
364           found = true;
365           machines_and_cpus.erase(it);
366           break;
367         }
368         if (machines_and_cpus.empty())
369           expectations.erase(expectations.begin());
370         EXPECT_TRUE(found);
371       }));
372   // Set up expectations for remote machines.
373   for (auto* parser : extra_parsers) {
374     EXPECT_CALL(*parser, MOCK_ParseFtracePacket(_, _, _, _, _))
375         .WillRepeatedly(Invoke(
376             [&expectations](uint32_t cpu, int64_t timestamp, const uint8_t*,
377                             size_t, std::optional<MachineId> machine_id) {
378               EXPECT_TRUE(machine_id.has_value());
379               EXPECT_EQ(expectations.begin()->first, timestamp);
380               auto& machines_and_cpus = expectations.begin()->second;
381               bool found = false;
382               for (auto it = machines_and_cpus.begin();
383                    it < machines_and_cpus.end(); it++) {
384                 // Remote machines are called with non-null machine_id.
385                 if (*it != ExpectedMachineAndCpu{machine_id, cpu})
386                   continue;
387                 found = true;
388                 machines_and_cpus.erase(it);
389                 break;
390               }
391               if (machines_and_cpus.empty())
392                 expectations.erase(expectations.begin());
393               EXPECT_TRUE(found);
394             }));
395   }
396 
397   // Allocate a 1000 byte trace blob (per-machine) and push one byte chunks to
398   // be sorted with random timestamps.
399   constexpr size_t alloc_size = 1000;
400   TraceBlobView tbv(TraceBlob::Allocate(alloc_size * num_machines));
401   for (size_t m = 0; m < num_machines; m++) {
402     // TraceProcessorContext::machine_id is nullopt for the default machine or a
403     // monotonic counter starting from 1. 0 is a reserved value that isn't used.
404     std::optional<MachineId> machine;
405     if (m)
406       machine = extra_contexts[m - 1]->machine_id();
407 
408     for (uint16_t i = 0; i < alloc_size; i++) {
409       int64_t ts = abs(static_cast<int64_t>(rnd_engine()));
410       uint8_t num_cpus = rnd_engine() % 3;
411       for (uint8_t j = 0; j < num_cpus; j++) {
412         uint32_t cpu = static_cast<uint32_t>(rnd_engine() % 32);
413         expectations[ts].push_back(ExpectedMachineAndCpu{machine, cpu});
414         context_.sorter->PushFtraceEvent(
415             cpu, ts, tbv.slice_off(m * alloc_size + i, 1), state, machine);
416       }
417     }
418   }
419 
420   context_.sorter->ExtractEventsForced();
421   EXPECT_TRUE(expectations.empty());
422 }
423 
424 }  // namespace
425 }  // namespace trace_processor
426 }  // namespace perfetto
427