1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/importers/perf/aux_stream_manager.h"
18
19 #include <cinttypes>
20 #include <cstdint>
21 #include <functional>
22 #include <memory>
23 #include <utility>
24 #include <variant>
25
26 #include "perfetto/base/logging.h"
27 #include "perfetto/base/status.h"
28 #include "perfetto/trace_processor/trace_blob_view.h"
29 #include "src/trace_processor/importers/perf/aux_data_tokenizer.h"
30 #include "src/trace_processor/importers/perf/aux_record.h"
31 #include "src/trace_processor/importers/perf/auxtrace_info_record.h"
32 #include "src/trace_processor/importers/perf/auxtrace_record.h"
33 #include "src/trace_processor/importers/perf/itrace_start_record.h"
34 #include "src/trace_processor/importers/perf/perf_event.h"
35 #include "src/trace_processor/importers/perf/perf_tracker.h"
36 #include "src/trace_processor/storage/trace_storage.h"
37 #include "src/trace_processor/types/trace_processor_context.h"
38 #include "src/trace_processor/util/status_macros.h"
39
40 namespace perfetto::trace_processor::perf_importer {
41
42 base::StatusOr<std::reference_wrapper<AuxStream>>
GetOrCreateStreamForSampleId(const std::optional<SampleId> & sample_id)43 AuxStreamManager::GetOrCreateStreamForSampleId(
44 const std::optional<SampleId>& sample_id) {
45 if (!sample_id.has_value() || !sample_id->cpu().has_value()) {
46 return base::ErrStatus(
47 "Aux data handling only implemented for per cpu data.");
48 }
49 return GetOrCreateStreamForCpu(*sample_id->cpu());
50 }
51
OnAuxtraceInfoRecord(AuxtraceInfoRecord info)52 base::Status AuxStreamManager::OnAuxtraceInfoRecord(AuxtraceInfoRecord info) {
53 if (tokenizer_) {
54 return base::ErrStatus("Multiple PERF_RECORD_AUXTRACE_INFO not supported.");
55 }
56 ASSIGN_OR_RETURN(tokenizer_,
57 PerfTracker::GetOrCreate(context_)->CreateAuxDataTokenizer(
58 std::move(info)));
59 return base::OkStatus();
60 }
61
OnAuxRecord(AuxRecord aux)62 base::Status AuxStreamManager::OnAuxRecord(AuxRecord aux) {
63 if (!tokenizer_) {
64 return base::ErrStatus(
65 "PERF_RECORD_AUX without previous PERF_RECORD_AUXTRACE_INFO.");
66 }
67 ASSIGN_OR_RETURN(AuxStream & stream,
68 GetOrCreateStreamForSampleId(aux.sample_id));
69 return stream.OnAuxRecord(aux);
70 }
71
OnAuxtraceRecord(AuxtraceRecord auxtrace,TraceBlobView data)72 base::Status AuxStreamManager::OnAuxtraceRecord(AuxtraceRecord auxtrace,
73 TraceBlobView data) {
74 if (!tokenizer_) {
75 return base::ErrStatus(
76 "PERF_RECORD_AUXTRACE without previous PERF_RECORD_AUXTRACE_INFO.");
77 }
78 if (auxtrace.cpu == std::numeric_limits<uint32_t>::max()) {
79 // Aux data can be written by cpu or by tid. An unset cpu will have a value
80 // of UINT32_MAX. Be aware for an unset tid simpleperf uses 0 and perf uses
81 // UINT32_MAX. ¯\_(ツ)_/¯
82 // Deal just with per cpu data for now.
83 return base::ErrStatus(
84 "Aux data handling only implemented for per cpu data.");
85 }
86 ASSIGN_OR_RETURN(AuxStream & stream, GetOrCreateStreamForCpu(auxtrace.cpu));
87 return stream.OnAuxtraceRecord(std::move(auxtrace), std::move(data));
88 }
89
OnItraceStartRecord(ItraceStartRecord start)90 base::Status AuxStreamManager::OnItraceStartRecord(ItraceStartRecord start) {
91 ASSIGN_OR_RETURN(AuxStream & stream,
92 GetOrCreateStreamForSampleId(start.sample_id));
93 return stream.OnItraceStartRecord(std::move(start));
94 }
95
FinalizeStreams()96 base::Status AuxStreamManager::FinalizeStreams() {
97 for (auto it = auxdata_streams_by_cpu_.GetIterator(); it; ++it) {
98 RETURN_IF_ERROR(it.value()->NotifyEndOfStream());
99 }
100
101 return base::OkStatus();
102 }
103
104 base::StatusOr<std::reference_wrapper<AuxStream>>
GetOrCreateStreamForCpu(uint32_t cpu)105 AuxStreamManager::GetOrCreateStreamForCpu(uint32_t cpu) {
106 PERFETTO_CHECK(tokenizer_);
107 std::unique_ptr<AuxStream>* stream_ptr = auxdata_streams_by_cpu_.Find(cpu);
108 if (!stream_ptr) {
109 std::unique_ptr<AuxStream> stream(
110 new AuxStream(this, AuxStream::Type::kCpuBound, cpu));
111 ASSIGN_OR_RETURN(stream->data_stream_,
112 tokenizer_->InitializeAuxDataStream(stream.get()));
113 stream_ptr = auxdata_streams_by_cpu_.Insert(cpu, std::move(stream)).first;
114 }
115
116 return std::ref(**stream_ptr);
117 }
118
AuxStream(AuxStreamManager * manager,Type type,uint32_t tid_or_cpu)119 AuxStream::AuxStream(AuxStreamManager* manager, Type type, uint32_t tid_or_cpu)
120 : manager_(*manager), type_(type), tid_or_cpu_(tid_or_cpu) {}
121 AuxStream::~AuxStream() = default;
122
OnAuxRecord(AuxRecord aux)123 base::Status AuxStream::OnAuxRecord(AuxRecord aux) {
124 if (aux.offset < aux_end_) {
125 return base::ErrStatus("Overlapping AuxRecord. Got %" PRIu64
126 ", expected at least %" PRIu64,
127 aux.offset, aux_end_);
128 }
129 if (aux.offset > aux_end_) {
130 manager_.context()->storage->IncrementStats(
131 stats::perf_aux_missing, static_cast<int64_t>(aux.offset - aux_end_));
132 }
133 if (aux.flags & PERF_AUX_FLAG_TRUNCATED) {
134 manager_.context()->storage->IncrementStats(stats::perf_aux_truncated);
135 }
136 if (aux.flags & PERF_AUX_FLAG_PARTIAL) {
137 manager_.context()->storage->IncrementStats(stats::perf_aux_partial);
138 }
139 if (aux.flags & PERF_AUX_FLAG_COLLISION) {
140 manager_.context()->storage->IncrementStats(stats::perf_aux_collision);
141 }
142 outstanding_records_.emplace_back(std::move(aux));
143 aux_end_ = aux.end();
144 return MaybeParse();
145 }
146
OnAuxtraceRecord(AuxtraceRecord auxtrace,TraceBlobView data)147 base::Status AuxStream::OnAuxtraceRecord(AuxtraceRecord auxtrace,
148 TraceBlobView data) {
149 PERFETTO_CHECK(auxtrace.size == data.size());
150 if (auxtrace.offset < auxtrace_end_) {
151 return base::ErrStatus("Overlapping AuxtraceData");
152 }
153 if (auxtrace.offset > auxtrace_end_) {
154 manager_.context()->storage->IncrementStats(
155 stats::perf_auxtrace_missing,
156 static_cast<int64_t>(auxtrace.offset - auxtrace_end_));
157 }
158 outstanding_auxtrace_data_.emplace_back(std::move(auxtrace), std::move(data));
159 auxtrace_end_ = outstanding_auxtrace_data_.back().end();
160 return MaybeParse();
161 }
162
MaybeParse()163 base::Status AuxStream::MaybeParse() {
164 while (!outstanding_records_.empty()) {
165 auto& record = outstanding_records_.front();
166 if (auto r = std::get_if<ItraceStartRecord>(&record); r) {
167 RETURN_IF_ERROR(data_stream_->OnItraceStartRecord(std::move(*r)));
168 outstanding_records_.pop_front();
169 continue;
170 }
171
172 const AuxRecord* aux_record = std::get_if<AuxRecord>(&record);
173 PERFETTO_CHECK(aux_record);
174
175 if (aux_record->size == 0) {
176 outstanding_records_.pop_front();
177 continue;
178 }
179
180 if (outstanding_auxtrace_data_.empty()) {
181 break;
182 }
183
184 AuxtraceDataReader& auxtrace_data = outstanding_auxtrace_data_.front();
185
186 // We need both auxtrace and aux, so we start at the biggest offset and end
187 // at the smallest.
188 const uint64_t start_offset =
189 std::max(aux_record->offset, auxtrace_data.offset());
190 const uint64_t end_offset =
191 std::min(aux_record->end(), auxtrace_data.end());
192
193 if (start_offset >= auxtrace_data.end()) {
194 // No usefull data at the front of the queue
195 outstanding_auxtrace_data_.pop_front();
196 continue;
197 }
198 if (start_offset >= aux_record->end()) {
199 // No usefull data in this record
200 outstanding_records_.pop_front();
201 continue;
202 }
203 // We can not get this after the two check above.
204 PERFETTO_CHECK(start_offset < end_offset);
205
206 if (tokenizer_offset_ < start_offset) {
207 data_stream_->OnDataLoss(start_offset - tokenizer_offset_);
208 tokenizer_offset_ = start_offset;
209 }
210 PERFETTO_CHECK(tokenizer_offset_ == start_offset);
211
212 const uint64_t size = end_offset - start_offset;
213 auxtrace_data.DropUntil(start_offset);
214 TraceBlobView data = auxtrace_data.ConsumeFront(size);
215
216 AuxRecord adjusted_aux_record = *aux_record;
217 adjusted_aux_record.offset = tokenizer_offset_;
218 adjusted_aux_record.size = size;
219 tokenizer_offset_ += size;
220 RETURN_IF_ERROR(
221 data_stream_->Parse(std::move(adjusted_aux_record), std::move(data)));
222 }
223
224 return base::OkStatus();
225 }
226
OnItraceStartRecord(ItraceStartRecord start)227 base::Status AuxStream::OnItraceStartRecord(ItraceStartRecord start) {
228 if (outstanding_records_.empty()) {
229 return data_stream_->OnItraceStartRecord(std::move(start));
230 }
231 outstanding_records_.emplace_back(std::move(start));
232 return base::OkStatus();
233 }
234
NotifyEndOfStream()235 base::Status AuxStream::NotifyEndOfStream() {
236 for (; !outstanding_records_.empty(); outstanding_records_.pop_front()) {
237 auto& record = outstanding_records_.front();
238 if (auto* r = std::get_if<ItraceStartRecord>(&record); r) {
239 RETURN_IF_ERROR(data_stream_->OnItraceStartRecord(std::move(*r)));
240 }
241 }
242
243 if (aux_end_ < auxtrace_end_) {
244 manager_.context()->storage->IncrementStats(
245 stats::perf_aux_missing,
246 static_cast<int64_t>(auxtrace_end_ - aux_end_));
247 } else if (auxtrace_end_ < aux_end_) {
248 manager_.context()->storage->IncrementStats(
249 stats::perf_auxtrace_missing,
250 static_cast<int64_t>(aux_end_ - auxtrace_end_));
251 }
252
253 uint64_t end = std::max(aux_end_, auxtrace_end_);
254 if (tokenizer_offset_ < end) {
255 uint64_t loss = end - tokenizer_offset_;
256 data_stream_->OnDataLoss(loss);
257 tokenizer_offset_ += loss;
258 }
259 return data_stream_->NotifyEndOfStream();
260 }
261
AuxtraceDataReader(AuxtraceRecord auxtrace,TraceBlobView data)262 AuxStream::AuxtraceDataReader::AuxtraceDataReader(AuxtraceRecord auxtrace,
263 TraceBlobView data)
264 : offset_(auxtrace.offset), data_(std::move(data)) {
265 PERFETTO_CHECK(auxtrace.size == data_.size());
266 }
267
DropUntil(uint64_t offset)268 void AuxStream::AuxtraceDataReader::DropUntil(uint64_t offset) {
269 PERFETTO_CHECK(offset >= offset_ && offset <= end());
270 const uint64_t size = offset - offset_;
271
272 data_ = data_.slice_off(size, data_.size() - size);
273 offset_ += size;
274 }
275
ConsumeFront(uint64_t size)276 TraceBlobView AuxStream::AuxtraceDataReader::ConsumeFront(uint64_t size) {
277 PERFETTO_CHECK(size <= data_.size());
278 TraceBlobView res = data_.slice_off(0, size);
279 data_ = data_.slice_off(size, data_.size() - size);
280 offset_ += size;
281 return res;
282 }
283
284 } // namespace perfetto::trace_processor::perf_importer
285