xref: /aosp_15_r20/external/perfetto/src/trace_processor/importers/perf/aux_stream_manager.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/importers/perf/aux_stream_manager.h"
18 
19 #include <cinttypes>
20 #include <cstdint>
21 #include <functional>
22 #include <memory>
23 #include <utility>
24 #include <variant>
25 
26 #include "perfetto/base/logging.h"
27 #include "perfetto/base/status.h"
28 #include "perfetto/trace_processor/trace_blob_view.h"
29 #include "src/trace_processor/importers/perf/aux_data_tokenizer.h"
30 #include "src/trace_processor/importers/perf/aux_record.h"
31 #include "src/trace_processor/importers/perf/auxtrace_info_record.h"
32 #include "src/trace_processor/importers/perf/auxtrace_record.h"
33 #include "src/trace_processor/importers/perf/itrace_start_record.h"
34 #include "src/trace_processor/importers/perf/perf_event.h"
35 #include "src/trace_processor/importers/perf/perf_tracker.h"
36 #include "src/trace_processor/storage/trace_storage.h"
37 #include "src/trace_processor/types/trace_processor_context.h"
38 #include "src/trace_processor/util/status_macros.h"
39 
40 namespace perfetto::trace_processor::perf_importer {
41 
42 base::StatusOr<std::reference_wrapper<AuxStream>>
GetOrCreateStreamForSampleId(const std::optional<SampleId> & sample_id)43 AuxStreamManager::GetOrCreateStreamForSampleId(
44     const std::optional<SampleId>& sample_id) {
45   if (!sample_id.has_value() || !sample_id->cpu().has_value()) {
46     return base::ErrStatus(
47         "Aux data handling only implemented for per cpu data.");
48   }
49   return GetOrCreateStreamForCpu(*sample_id->cpu());
50 }
51 
OnAuxtraceInfoRecord(AuxtraceInfoRecord info)52 base::Status AuxStreamManager::OnAuxtraceInfoRecord(AuxtraceInfoRecord info) {
53   if (tokenizer_) {
54     return base::ErrStatus("Multiple PERF_RECORD_AUXTRACE_INFO not supported.");
55   }
56   ASSIGN_OR_RETURN(tokenizer_,
57                    PerfTracker::GetOrCreate(context_)->CreateAuxDataTokenizer(
58                        std::move(info)));
59   return base::OkStatus();
60 }
61 
OnAuxRecord(AuxRecord aux)62 base::Status AuxStreamManager::OnAuxRecord(AuxRecord aux) {
63   if (!tokenizer_) {
64     return base::ErrStatus(
65         "PERF_RECORD_AUX without previous PERF_RECORD_AUXTRACE_INFO.");
66   }
67   ASSIGN_OR_RETURN(AuxStream & stream,
68                    GetOrCreateStreamForSampleId(aux.sample_id));
69   return stream.OnAuxRecord(aux);
70 }
71 
OnAuxtraceRecord(AuxtraceRecord auxtrace,TraceBlobView data)72 base::Status AuxStreamManager::OnAuxtraceRecord(AuxtraceRecord auxtrace,
73                                                 TraceBlobView data) {
74   if (!tokenizer_) {
75     return base::ErrStatus(
76         "PERF_RECORD_AUXTRACE without previous PERF_RECORD_AUXTRACE_INFO.");
77   }
78   if (auxtrace.cpu == std::numeric_limits<uint32_t>::max()) {
79     // Aux data can be written by cpu or by tid. An unset cpu will have a value
80     // of UINT32_MAX. Be aware for an unset tid simpleperf uses 0 and perf uses
81     // UINT32_MAX. ¯\_(ツ)_/¯
82     // Deal just with per cpu data for now.
83     return base::ErrStatus(
84         "Aux data handling only implemented for per cpu data.");
85   }
86   ASSIGN_OR_RETURN(AuxStream & stream, GetOrCreateStreamForCpu(auxtrace.cpu));
87   return stream.OnAuxtraceRecord(std::move(auxtrace), std::move(data));
88 }
89 
OnItraceStartRecord(ItraceStartRecord start)90 base::Status AuxStreamManager::OnItraceStartRecord(ItraceStartRecord start) {
91   ASSIGN_OR_RETURN(AuxStream & stream,
92                    GetOrCreateStreamForSampleId(start.sample_id));
93   return stream.OnItraceStartRecord(std::move(start));
94 }
95 
FinalizeStreams()96 base::Status AuxStreamManager::FinalizeStreams() {
97   for (auto it = auxdata_streams_by_cpu_.GetIterator(); it; ++it) {
98     RETURN_IF_ERROR(it.value()->NotifyEndOfStream());
99   }
100 
101   return base::OkStatus();
102 }
103 
104 base::StatusOr<std::reference_wrapper<AuxStream>>
GetOrCreateStreamForCpu(uint32_t cpu)105 AuxStreamManager::GetOrCreateStreamForCpu(uint32_t cpu) {
106   PERFETTO_CHECK(tokenizer_);
107   std::unique_ptr<AuxStream>* stream_ptr = auxdata_streams_by_cpu_.Find(cpu);
108   if (!stream_ptr) {
109     std::unique_ptr<AuxStream> stream(
110         new AuxStream(this, AuxStream::Type::kCpuBound, cpu));
111     ASSIGN_OR_RETURN(stream->data_stream_,
112                      tokenizer_->InitializeAuxDataStream(stream.get()));
113     stream_ptr = auxdata_streams_by_cpu_.Insert(cpu, std::move(stream)).first;
114   }
115 
116   return std::ref(**stream_ptr);
117 }
118 
AuxStream(AuxStreamManager * manager,Type type,uint32_t tid_or_cpu)119 AuxStream::AuxStream(AuxStreamManager* manager, Type type, uint32_t tid_or_cpu)
120     : manager_(*manager), type_(type), tid_or_cpu_(tid_or_cpu) {}
121 AuxStream::~AuxStream() = default;
122 
OnAuxRecord(AuxRecord aux)123 base::Status AuxStream::OnAuxRecord(AuxRecord aux) {
124   if (aux.offset < aux_end_) {
125     return base::ErrStatus("Overlapping AuxRecord. Got %" PRIu64
126                            ", expected at least %" PRIu64,
127                            aux.offset, aux_end_);
128   }
129   if (aux.offset > aux_end_) {
130     manager_.context()->storage->IncrementStats(
131         stats::perf_aux_missing, static_cast<int64_t>(aux.offset - aux_end_));
132   }
133   if (aux.flags & PERF_AUX_FLAG_TRUNCATED) {
134     manager_.context()->storage->IncrementStats(stats::perf_aux_truncated);
135   }
136   if (aux.flags & PERF_AUX_FLAG_PARTIAL) {
137     manager_.context()->storage->IncrementStats(stats::perf_aux_partial);
138   }
139   if (aux.flags & PERF_AUX_FLAG_COLLISION) {
140     manager_.context()->storage->IncrementStats(stats::perf_aux_collision);
141   }
142   outstanding_records_.emplace_back(std::move(aux));
143   aux_end_ = aux.end();
144   return MaybeParse();
145 }
146 
OnAuxtraceRecord(AuxtraceRecord auxtrace,TraceBlobView data)147 base::Status AuxStream::OnAuxtraceRecord(AuxtraceRecord auxtrace,
148                                          TraceBlobView data) {
149   PERFETTO_CHECK(auxtrace.size == data.size());
150   if (auxtrace.offset < auxtrace_end_) {
151     return base::ErrStatus("Overlapping AuxtraceData");
152   }
153   if (auxtrace.offset > auxtrace_end_) {
154     manager_.context()->storage->IncrementStats(
155         stats::perf_auxtrace_missing,
156         static_cast<int64_t>(auxtrace.offset - auxtrace_end_));
157   }
158   outstanding_auxtrace_data_.emplace_back(std::move(auxtrace), std::move(data));
159   auxtrace_end_ = outstanding_auxtrace_data_.back().end();
160   return MaybeParse();
161 }
162 
MaybeParse()163 base::Status AuxStream::MaybeParse() {
164   while (!outstanding_records_.empty()) {
165     auto& record = outstanding_records_.front();
166     if (auto r = std::get_if<ItraceStartRecord>(&record); r) {
167       RETURN_IF_ERROR(data_stream_->OnItraceStartRecord(std::move(*r)));
168       outstanding_records_.pop_front();
169       continue;
170     }
171 
172     const AuxRecord* aux_record = std::get_if<AuxRecord>(&record);
173     PERFETTO_CHECK(aux_record);
174 
175     if (aux_record->size == 0) {
176       outstanding_records_.pop_front();
177       continue;
178     }
179 
180     if (outstanding_auxtrace_data_.empty()) {
181       break;
182     }
183 
184     AuxtraceDataReader& auxtrace_data = outstanding_auxtrace_data_.front();
185 
186     // We need both auxtrace and aux, so we start at the biggest offset and end
187     // at the smallest.
188     const uint64_t start_offset =
189         std::max(aux_record->offset, auxtrace_data.offset());
190     const uint64_t end_offset =
191         std::min(aux_record->end(), auxtrace_data.end());
192 
193     if (start_offset >= auxtrace_data.end()) {
194       // No usefull data at the front of the queue
195       outstanding_auxtrace_data_.pop_front();
196       continue;
197     }
198     if (start_offset >= aux_record->end()) {
199       // No usefull data in this record
200       outstanding_records_.pop_front();
201       continue;
202     }
203     // We can not get this after the two check above.
204     PERFETTO_CHECK(start_offset < end_offset);
205 
206     if (tokenizer_offset_ < start_offset) {
207       data_stream_->OnDataLoss(start_offset - tokenizer_offset_);
208       tokenizer_offset_ = start_offset;
209     }
210     PERFETTO_CHECK(tokenizer_offset_ == start_offset);
211 
212     const uint64_t size = end_offset - start_offset;
213     auxtrace_data.DropUntil(start_offset);
214     TraceBlobView data = auxtrace_data.ConsumeFront(size);
215 
216     AuxRecord adjusted_aux_record = *aux_record;
217     adjusted_aux_record.offset = tokenizer_offset_;
218     adjusted_aux_record.size = size;
219     tokenizer_offset_ += size;
220     RETURN_IF_ERROR(
221         data_stream_->Parse(std::move(adjusted_aux_record), std::move(data)));
222   }
223 
224   return base::OkStatus();
225 }
226 
OnItraceStartRecord(ItraceStartRecord start)227 base::Status AuxStream::OnItraceStartRecord(ItraceStartRecord start) {
228   if (outstanding_records_.empty()) {
229     return data_stream_->OnItraceStartRecord(std::move(start));
230   }
231   outstanding_records_.emplace_back(std::move(start));
232   return base::OkStatus();
233 }
234 
NotifyEndOfStream()235 base::Status AuxStream::NotifyEndOfStream() {
236   for (; !outstanding_records_.empty(); outstanding_records_.pop_front()) {
237     auto& record = outstanding_records_.front();
238     if (auto* r = std::get_if<ItraceStartRecord>(&record); r) {
239       RETURN_IF_ERROR(data_stream_->OnItraceStartRecord(std::move(*r)));
240     }
241   }
242 
243   if (aux_end_ < auxtrace_end_) {
244     manager_.context()->storage->IncrementStats(
245         stats::perf_aux_missing,
246         static_cast<int64_t>(auxtrace_end_ - aux_end_));
247   } else if (auxtrace_end_ < aux_end_) {
248     manager_.context()->storage->IncrementStats(
249         stats::perf_auxtrace_missing,
250         static_cast<int64_t>(aux_end_ - auxtrace_end_));
251   }
252 
253   uint64_t end = std::max(aux_end_, auxtrace_end_);
254   if (tokenizer_offset_ < end) {
255     uint64_t loss = end - tokenizer_offset_;
256     data_stream_->OnDataLoss(loss);
257     tokenizer_offset_ += loss;
258   }
259   return data_stream_->NotifyEndOfStream();
260 }
261 
AuxtraceDataReader(AuxtraceRecord auxtrace,TraceBlobView data)262 AuxStream::AuxtraceDataReader::AuxtraceDataReader(AuxtraceRecord auxtrace,
263                                                   TraceBlobView data)
264     : offset_(auxtrace.offset), data_(std::move(data)) {
265   PERFETTO_CHECK(auxtrace.size == data_.size());
266 }
267 
DropUntil(uint64_t offset)268 void AuxStream::AuxtraceDataReader::DropUntil(uint64_t offset) {
269   PERFETTO_CHECK(offset >= offset_ && offset <= end());
270   const uint64_t size = offset - offset_;
271 
272   data_ = data_.slice_off(size, data_.size() - size);
273   offset_ += size;
274 }
275 
ConsumeFront(uint64_t size)276 TraceBlobView AuxStream::AuxtraceDataReader::ConsumeFront(uint64_t size) {
277   PERFETTO_CHECK(size <= data_.size());
278   TraceBlobView res = data_.slice_off(0, size);
279   data_ = data_.slice_off(size, data_.size() - size);
280   offset_ += size;
281   return res;
282 }
283 
284 }  // namespace perfetto::trace_processor::perf_importer
285