1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "src/trace_processor/sorter/trace_sorter.h"
17
18 #include <map>
19 #include <random>
20 #include <vector>
21
22 #include "perfetto/trace_processor/basic_types.h"
23 #include "perfetto/trace_processor/trace_blob.h"
24 #include "perfetto/trace_processor/trace_blob_view.h"
25 #include "src/trace_processor/importers/common/parser_types.h"
26 #include "src/trace_processor/importers/proto/packet_sequence_state_generation.h"
27 #include "src/trace_processor/importers/proto/proto_trace_parser_impl.h"
28 #include "src/trace_processor/types/trace_processor_context.h"
29 #include "test/gtest_and_gmock.h"
30
31 namespace perfetto {
32 namespace trace_processor {
33 namespace {
34
35 using ::testing::_;
36 using ::testing::InSequence;
37 using ::testing::Invoke;
38 using ::testing::MockFunction;
39 using ::testing::NiceMock;
40
41 constexpr std::optional<MachineId> kNullMachineId = std::nullopt;
42
43 class MockTraceParser : public ProtoTraceParserImpl {
44 public:
MockTraceParser(TraceProcessorContext * context)45 explicit MockTraceParser(TraceProcessorContext* context)
46 : ProtoTraceParserImpl(context), machine_id_(context->machine_id()) {}
47
48 MOCK_METHOD(void,
49 MOCK_ParseFtracePacket,
50 (uint32_t cpu,
51 int64_t timestamp,
52 const uint8_t* data,
53 size_t length,
54 std::optional<MachineId>));
55
ParseFtraceEvent(uint32_t cpu,int64_t timestamp,TracePacketData data)56 void ParseFtraceEvent(uint32_t cpu,
57 int64_t timestamp,
58 TracePacketData data) override {
59 MOCK_ParseFtracePacket(cpu, timestamp, data.packet.data(),
60 data.packet.length(), machine_id_);
61 }
62
63 MOCK_METHOD(void,
64 MOCK_ParseTracePacket,
65 (int64_t ts, const uint8_t* data, size_t length));
66
ParseTrackEvent(int64_t,TrackEventData)67 void ParseTrackEvent(int64_t, TrackEventData) override {}
68
ParseTracePacket(int64_t ts,TracePacketData data)69 void ParseTracePacket(int64_t ts, TracePacketData data) override {
70 TraceBlobView& tbv = data.packet;
71 MOCK_ParseTracePacket(ts, tbv.data(), tbv.length());
72 }
73
74 std::optional<MachineId> machine_id_;
75 };
76
77 class MockTraceStorage : public TraceStorage {
78 public:
MockTraceStorage()79 MockTraceStorage() : TraceStorage() {}
80
81 MOCK_METHOD(StringId, InternString, (base::StringView view), (override));
82 };
83
84 class TraceSorterTest : public ::testing::Test {
85 public:
TraceSorterTest()86 TraceSorterTest() : test_buffer_(TraceBlob::Allocate(8)) {
87 storage_ = new NiceMock<MockTraceStorage>();
88 context_.storage.reset(storage_);
89 CreateSorter();
90 }
91
CreateSorter(bool full_sort=true)92 void CreateSorter(bool full_sort = true) {
93 parser_ = new MockTraceParser(&context_);
94 context_.proto_trace_parser.reset(parser_);
95 auto sorting_mode = full_sort ? TraceSorter::SortingMode::kFullSort
96 : TraceSorter::SortingMode::kDefault;
97 context_.sorter.reset(new TraceSorter(&context_, sorting_mode));
98 }
99
100 protected:
101 TraceProcessorContext context_;
102 MockTraceParser* parser_;
103 NiceMock<MockTraceStorage>* storage_;
104 TraceBlobView test_buffer_;
105 };
106
TEST_F(TraceSorterTest,TestFtrace)107 TEST_F(TraceSorterTest, TestFtrace) {
108 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
109 TraceBlobView view = test_buffer_.slice_off(0, 1);
110 EXPECT_CALL(*parser_,
111 MOCK_ParseFtracePacket(0, 1000, view.data(), 1, kNullMachineId));
112 context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
113 std::move(view), state);
114 context_.sorter->ExtractEventsForced();
115 }
116
TEST_F(TraceSorterTest,TestTracePacket)117 TEST_F(TraceSorterTest, TestTracePacket) {
118 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
119 TraceBlobView view = test_buffer_.slice_off(0, 1);
120 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1000, view.data(), 1));
121 context_.sorter->PushTracePacket(1000, state, std::move(view));
122 context_.sorter->ExtractEventsForced();
123 }
124
TEST_F(TraceSorterTest,Ordering)125 TEST_F(TraceSorterTest, Ordering) {
126 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
127 TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
128 TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
129 TraceBlobView view_3 = test_buffer_.slice_off(0, 3);
130 TraceBlobView view_4 = test_buffer_.slice_off(0, 4);
131
132 InSequence s;
133
134 EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(0, 1000, view_1.data(), 1,
135 kNullMachineId));
136 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1001, view_2.data(), 2));
137 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, view_3.data(), 3));
138 EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(2, 1200, view_4.data(), 4,
139 kNullMachineId));
140
141 context_.sorter->PushFtraceEvent(2 /*cpu*/, 1200 /*timestamp*/,
142 std::move(view_4), state);
143 context_.sorter->PushTracePacket(1001, state, std::move(view_2));
144 context_.sorter->PushTracePacket(1100, state, std::move(view_3));
145 context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
146 std::move(view_1), state);
147 context_.sorter->ExtractEventsForced();
148 }
149
TEST_F(TraceSorterTest,IncrementalExtraction)150 TEST_F(TraceSorterTest, IncrementalExtraction) {
151 CreateSorter(false);
152
153 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
154
155 TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
156 TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
157 TraceBlobView view_3 = test_buffer_.slice_off(0, 3);
158 TraceBlobView view_4 = test_buffer_.slice_off(0, 4);
159 TraceBlobView view_5 = test_buffer_.slice_off(0, 5);
160
161 // Flush at the start of packet sequence to match behavior of the
162 // service.
163 context_.sorter->NotifyFlushEvent();
164 context_.sorter->PushTracePacket(1200, state, std::move(view_2));
165 context_.sorter->PushTracePacket(1100, state, std::move(view_1));
166
167 // No data should be exttracted at this point because we haven't
168 // seen two flushes yet.
169 context_.sorter->NotifyReadBufferEvent();
170
171 // Now that we've seen two flushes, we should be ready to start extracting
172 // data on the next OnReadBufer call (after two flushes as usual).
173 context_.sorter->NotifyFlushEvent();
174 context_.sorter->NotifyReadBufferEvent();
175
176 context_.sorter->NotifyFlushEvent();
177 context_.sorter->NotifyFlushEvent();
178 context_.sorter->PushTracePacket(1400, state, std::move(view_4));
179 context_.sorter->PushTracePacket(1300, state, std::move(view_3));
180
181 // This ReadBuffer call should finally extract until the first OnReadBuffer
182 // call.
183 {
184 InSequence s;
185 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
186 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
187 }
188 context_.sorter->NotifyReadBufferEvent();
189
190 context_.sorter->NotifyFlushEvent();
191 context_.sorter->PushTracePacket(1500, state, std::move(view_5));
192
193 // Nothing should be extracted as we haven't seen the second flush.
194 context_.sorter->NotifyReadBufferEvent();
195
196 // Now we've seen the second flush we should extract the next two packets.
197 context_.sorter->NotifyFlushEvent();
198 {
199 InSequence s;
200 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1300, test_buffer_.data(), 3));
201 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1400, test_buffer_.data(), 4));
202 }
203 context_.sorter->NotifyReadBufferEvent();
204
205 // The forced extraction should get the last packet.
206 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1500, test_buffer_.data(), 5));
207 context_.sorter->ExtractEventsForced();
208 }
209
210 // Simulate a producer bug where the third packet is emitted
211 // out of order. Verify that we track the stats correctly.
TEST_F(TraceSorterTest,OutOfOrder)212 TEST_F(TraceSorterTest, OutOfOrder) {
213 CreateSorter(false);
214
215 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
216
217 TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
218 TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
219 TraceBlobView view_3 = test_buffer_.slice_off(0, 3);
220 TraceBlobView view_4 = test_buffer_.slice_off(0, 4);
221
222 context_.sorter->NotifyFlushEvent();
223 context_.sorter->NotifyFlushEvent();
224 context_.sorter->PushTracePacket(1200, state, std::move(view_2));
225 context_.sorter->PushTracePacket(1100, state, std::move(view_1));
226 context_.sorter->NotifyReadBufferEvent();
227
228 // Both of the packets should have been pushed through.
229 context_.sorter->NotifyFlushEvent();
230 context_.sorter->NotifyFlushEvent();
231 {
232 InSequence s;
233 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
234 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
235 }
236 context_.sorter->NotifyReadBufferEvent();
237
238 // Now, pass the third packet out of order.
239 context_.sorter->NotifyFlushEvent();
240 context_.sorter->NotifyFlushEvent();
241 context_.sorter->PushTracePacket(1150, state, std::move(view_3));
242 context_.sorter->NotifyReadBufferEvent();
243
244 // The third packet should still be pushed through.
245 context_.sorter->NotifyFlushEvent();
246 context_.sorter->NotifyFlushEvent();
247 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1150, test_buffer_.data(), 3));
248 context_.sorter->NotifyReadBufferEvent();
249
250 // But we should also increment the stat that this was out of order.
251 ASSERT_EQ(
252 context_.storage->stats()[stats::sorter_push_event_out_of_order].value,
253 1);
254
255 // Push the fourth packet also out of order but after third.
256 context_.sorter->NotifyFlushEvent();
257 context_.sorter->NotifyFlushEvent();
258 context_.sorter->PushTracePacket(1170, state, std::move(view_4));
259 context_.sorter->NotifyReadBufferEvent();
260
261 // The fourt packet should still be pushed through.
262 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1170, test_buffer_.data(), 4));
263 context_.sorter->ExtractEventsForced();
264
265 // But we should also increment the stat that this was out of order.
266 ASSERT_EQ(
267 context_.storage->stats()[stats::sorter_push_event_out_of_order].value,
268 2);
269 }
270
271 // Simulates a random stream of ftrace events happening on random CPUs.
272 // Tests that the output of the TraceSorter matches the timestamp order
273 // (% events happening at the same time on different CPUs).
TEST_F(TraceSorterTest,MultiQueueSorting)274 TEST_F(TraceSorterTest, MultiQueueSorting) {
275 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
276 std::minstd_rand0 rnd_engine(0);
277 std::map<int64_t /*ts*/, std::vector<uint32_t /*cpu*/>> expectations;
278
279 EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(_, _, _, _, _))
280 .WillRepeatedly(Invoke([&expectations](uint32_t cpu, int64_t timestamp,
281 const uint8_t*, size_t,
282 std::optional<MachineId>) {
283 EXPECT_EQ(expectations.begin()->first, timestamp);
284 auto& cpus = expectations.begin()->second;
285 bool cpu_found = false;
286 for (auto it = cpus.begin(); it < cpus.end(); it++) {
287 if (*it != cpu)
288 continue;
289 cpu_found = true;
290 cpus.erase(it);
291 break;
292 }
293 if (cpus.empty())
294 expectations.erase(expectations.begin());
295 EXPECT_TRUE(cpu_found);
296 }));
297
298 // Allocate a 1000 byte trace blob and push one byte chunks to be sorted with
299 // random timestamps. This will stress test the sorter with worst case
300 // scenarios and will (and has many times) expose any subtle bugs hiding in
301 // the sorter logic.
302 TraceBlobView tbv(TraceBlob::Allocate(1000));
303 for (uint16_t i = 0; i < 1000; i++) {
304 int64_t ts = abs(static_cast<int64_t>(rnd_engine()));
305 uint8_t num_cpus = rnd_engine() % 3;
306 for (uint8_t j = 0; j < num_cpus; j++) {
307 uint32_t cpu = static_cast<uint32_t>(rnd_engine() % 32);
308 expectations[ts].push_back(cpu);
309 context_.sorter->PushFtraceEvent(cpu, ts, tbv.slice_off(i, 1), state);
310 }
311 }
312
313 context_.sorter->ExtractEventsForced();
314 EXPECT_TRUE(expectations.empty());
315 }
316
317 // An generalized version of MultiQueueSorting with multiple machines.
TEST_F(TraceSorterTest,MultiMachineSorting)318 TEST_F(TraceSorterTest, MultiMachineSorting) {
319 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
320 std::minstd_rand0 rnd_engine(0);
321
322 struct ExpectedMachineAndCpu {
323 std::optional<MachineId> machine_id;
324 uint32_t cpu;
325
326 bool operator==(const ExpectedMachineAndCpu& other) const {
327 return std::tie(machine_id, cpu) == std::tie(other.machine_id, other.cpu);
328 }
329 bool operator!=(const ExpectedMachineAndCpu& other) const {
330 return !operator==(other);
331 }
332 };
333 std::map<int64_t /*ts*/, std::vector<ExpectedMachineAndCpu>> expectations;
334
335 // The total number of machines (including the default one).
336 constexpr size_t num_machines = 5;
337 std::vector<MockTraceParser*> extra_parsers;
338 std::vector<std::unique_ptr<TraceProcessorContext>> extra_contexts;
339 // Set up extra machines and add to the sorter.
340 // MachineIdValue are 1..(num_machines-1).
341 for (auto i = 1u; i < num_machines; i++) {
342 TraceProcessorContext::InitArgs args{context_.config, context_.storage, i};
343 auto ctx = std::make_unique<TraceProcessorContext>(args);
344 auto parser = std::make_unique<MockTraceParser>(ctx.get());
345 extra_parsers.push_back(parser.get());
346 ctx->proto_trace_parser = std::move(parser);
347 extra_contexts.push_back(std::move(ctx));
348 context_.sorter->AddMachineContext(extra_contexts.back().get());
349 }
350
351 // Set up the expectation for the default machine.
352 EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(_, _, _, _, _))
353 .WillRepeatedly(Invoke([&expectations](uint32_t cpu, int64_t timestamp,
354 const uint8_t*, size_t,
355 std::optional<MachineId>) {
356 EXPECT_EQ(expectations.begin()->first, timestamp);
357 auto& machines_and_cpus = expectations.begin()->second;
358 bool found = false;
359 for (auto it = machines_and_cpus.begin(); it < machines_and_cpus.end();
360 it++) {
361 // The default machine is called machine ID == std::nullopt.
362 if (*it != ExpectedMachineAndCpu{kNullMachineId, cpu})
363 continue;
364 found = true;
365 machines_and_cpus.erase(it);
366 break;
367 }
368 if (machines_and_cpus.empty())
369 expectations.erase(expectations.begin());
370 EXPECT_TRUE(found);
371 }));
372 // Set up expectations for remote machines.
373 for (auto* parser : extra_parsers) {
374 EXPECT_CALL(*parser, MOCK_ParseFtracePacket(_, _, _, _, _))
375 .WillRepeatedly(Invoke(
376 [&expectations](uint32_t cpu, int64_t timestamp, const uint8_t*,
377 size_t, std::optional<MachineId> machine_id) {
378 EXPECT_TRUE(machine_id.has_value());
379 EXPECT_EQ(expectations.begin()->first, timestamp);
380 auto& machines_and_cpus = expectations.begin()->second;
381 bool found = false;
382 for (auto it = machines_and_cpus.begin();
383 it < machines_and_cpus.end(); it++) {
384 // Remote machines are called with non-null machine_id.
385 if (*it != ExpectedMachineAndCpu{machine_id, cpu})
386 continue;
387 found = true;
388 machines_and_cpus.erase(it);
389 break;
390 }
391 if (machines_and_cpus.empty())
392 expectations.erase(expectations.begin());
393 EXPECT_TRUE(found);
394 }));
395 }
396
397 // Allocate a 1000 byte trace blob (per-machine) and push one byte chunks to
398 // be sorted with random timestamps.
399 constexpr size_t alloc_size = 1000;
400 TraceBlobView tbv(TraceBlob::Allocate(alloc_size * num_machines));
401 for (size_t m = 0; m < num_machines; m++) {
402 // TraceProcessorContext::machine_id is nullopt for the default machine or a
403 // monotonic counter starting from 1. 0 is a reserved value that isn't used.
404 std::optional<MachineId> machine;
405 if (m)
406 machine = extra_contexts[m - 1]->machine_id();
407
408 for (uint16_t i = 0; i < alloc_size; i++) {
409 int64_t ts = abs(static_cast<int64_t>(rnd_engine()));
410 uint8_t num_cpus = rnd_engine() % 3;
411 for (uint8_t j = 0; j < num_cpus; j++) {
412 uint32_t cpu = static_cast<uint32_t>(rnd_engine() % 32);
413 expectations[ts].push_back(ExpectedMachineAndCpu{machine, cpu});
414 context_.sorter->PushFtraceEvent(
415 cpu, ts, tbv.slice_off(m * alloc_size + i, 1), state, machine);
416 }
417 }
418 }
419
420 context_.sorter->ExtractEventsForced();
421 EXPECT_TRUE(expectations.empty());
422 }
423
424 } // namespace
425 } // namespace trace_processor
426 } // namespace perfetto
427