1 /*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/perfetto_sql/intrinsics/table_functions/connected_flow.h"
18
19 #include <cinttypes>
20 #include <cstddef>
21 #include <cstdint>
22 #include <memory>
23 #include <queue>
24 #include <set>
25 #include <string>
26 #include <utility>
27 #include <vector>
28
29 #include "perfetto/base/logging.h"
30 #include "perfetto/base/status.h"
31 #include "perfetto/ext/base/status_or.h"
32 #include "perfetto/trace_processor/basic_types.h"
33 #include "src/trace_processor/db/column/types.h"
34 #include "src/trace_processor/db/column_storage.h"
35 #include "src/trace_processor/db/table.h"
36 #include "src/trace_processor/db/typed_column.h"
37 #include "src/trace_processor/perfetto_sql/intrinsics/table_functions/ancestor.h"
38 #include "src/trace_processor/perfetto_sql/intrinsics/table_functions/descendant.h"
39 #include "src/trace_processor/perfetto_sql/intrinsics/table_functions/tables_py.h"
40 #include "src/trace_processor/storage/trace_storage.h"
41 #include "src/trace_processor/tables/flow_tables_py.h"
42 #include "src/trace_processor/tables/slice_tables_py.h"
43 #include "src/trace_processor/types/trace_processor_context.h"
44
45 namespace perfetto::trace_processor {
46 namespace tables {
47
48 ConnectedFlowTable::~ConnectedFlowTable() = default;
49
50 } // namespace tables
51
ConnectedFlow(Mode mode,const TraceStorage * storage)52 ConnectedFlow::ConnectedFlow(Mode mode, const TraceStorage* storage)
53 : mode_(mode), storage_(storage) {}
54
55 ConnectedFlow::~ConnectedFlow() = default;
56
57 namespace {
58
59 enum FlowVisitMode : uint8_t {
60 VISIT_INCOMING = 1 << 0,
61 VISIT_OUTGOING = 1 << 1,
62 VISIT_INCOMING_AND_OUTGOING = VISIT_INCOMING | VISIT_OUTGOING,
63 };
64
65 enum RelativesVisitMode : uint8_t {
66 VISIT_NO_RELATIVES = 0,
67 VISIT_ANCESTORS = 1 << 0,
68 VISIT_DESCENDANTS = 1 << 1,
69 VISIT_ALL_RELATIVES = VISIT_ANCESTORS | VISIT_DESCENDANTS,
70 };
71
72 // Searches through the slice table recursively to find connected flows.
73 // Usage:
74 // BFS bfs = BFS(context);
75 // bfs
76 // // Add list of slices to start with.
77 // .Start(start_id).Start(start_id2)
78 // // Additionally include relatives of |another_id| in search space.
79 // .GoToRelatives(another_id, VISIT_ANCESTORS)
80 // // Visit all connected slices to the above slices.
81 // .VisitAll(VISIT_INCOMING, VISIT_NO_RELATIVES);
82 //
83 // bfs.TakeResultingFlows();
84 class BFS {
85 public:
BFS(const TraceStorage * storage)86 explicit BFS(const TraceStorage* storage) : storage_(storage) {}
87
TakeResultingFlows()88 std::vector<tables::FlowTable::RowNumber> TakeResultingFlows() && {
89 return std::move(flow_rows_);
90 }
91
92 // Includes a starting slice ID to search.
Start(SliceId start_id)93 BFS& Start(SliceId start_id) {
94 slices_to_visit_.push({start_id, VisitType::START});
95 known_slices_.insert(start_id);
96 return *this;
97 }
98
99 // Visits all slices that can be reached from the given starting slices.
VisitAll(FlowVisitMode visit_flow,RelativesVisitMode visit_relatives)100 void VisitAll(FlowVisitMode visit_flow, RelativesVisitMode visit_relatives) {
101 while (!slices_to_visit_.empty()) {
102 SliceId slice_id = slices_to_visit_.front().first;
103 VisitType visit_type = slices_to_visit_.front().second;
104 slices_to_visit_.pop();
105
106 // If the given slice is being visited due to being ancestor or descendant
107 // of a previous one, do not compute ancestors or descendants again as the
108 // result is going to be the same.
109 if (visit_type != VisitType::VIA_RELATIVE) {
110 GoToRelatives(slice_id, visit_relatives);
111 }
112
113 // If the slice was visited by a flow, do not try to go back.
114 if ((visit_flow & VISIT_INCOMING) &&
115 visit_type != VisitType::VIA_OUTGOING_FLOW) {
116 GoByFlow(slice_id, FlowDirection::INCOMING);
117 }
118 if ((visit_flow & VISIT_OUTGOING) &&
119 visit_type != VisitType::VIA_INCOMING_FLOW) {
120 GoByFlow(slice_id, FlowDirection::OUTGOING);
121 }
122 }
123 }
124
125 // Includes the relatives of |slice_id| to the list of slices to visit.
GoToRelatives(SliceId slice_id,RelativesVisitMode visit_relatives)126 BFS& GoToRelatives(SliceId slice_id, RelativesVisitMode visit_relatives) {
127 const auto& slice_table = storage_->slice_table();
128 if (visit_relatives & VISIT_ANCESTORS) {
129 auto opt_ancestors = Ancestor::GetAncestorSlices(slice_table, slice_id);
130 if (opt_ancestors)
131 GoToRelativesImpl(*opt_ancestors);
132 }
133 if (visit_relatives & VISIT_DESCENDANTS) {
134 auto opt_descendants =
135 Descendant::GetDescendantSlices(slice_table, slice_id);
136 if (opt_descendants)
137 GoToRelativesImpl(*opt_descendants);
138 }
139 return *this;
140 }
141
142 private:
143 enum class FlowDirection {
144 INCOMING,
145 OUTGOING,
146 };
147
148 enum class VisitType {
149 START,
150 VIA_INCOMING_FLOW,
151 VIA_OUTGOING_FLOW,
152 VIA_RELATIVE,
153 };
154
GoByFlow(SliceId slice_id,FlowDirection flow_direction)155 void GoByFlow(SliceId slice_id, FlowDirection flow_direction) {
156 PERFETTO_DCHECK(known_slices_.count(slice_id) != 0);
157
158 const auto& flow = storage_->flow_table();
159
160 const TypedColumn<SliceId>& start_col =
161 flow_direction == FlowDirection::OUTGOING ? flow.slice_out()
162 : flow.slice_in();
163 Query q;
164 q.constraints = {start_col.eq(slice_id.value)};
165 auto it = flow.FilterToIterator(q);
166 for (; it; ++it) {
167 flow_rows_.push_back(it.row_number());
168
169 SliceId next_slice_id = flow_direction == FlowDirection::OUTGOING
170 ? it.slice_in()
171 : it.slice_out();
172 if (known_slices_.count(next_slice_id))
173 continue;
174
175 known_slices_.insert(next_slice_id);
176 slices_to_visit_.push(
177 {next_slice_id, flow_direction == FlowDirection::INCOMING
178 ? VisitType::VIA_INCOMING_FLOW
179 : VisitType::VIA_OUTGOING_FLOW});
180 }
181 }
182
GoToRelativesImpl(const std::vector<tables::SliceTable::RowNumber> & rows)183 void GoToRelativesImpl(
184 const std::vector<tables::SliceTable::RowNumber>& rows) {
185 const auto& slice = storage_->slice_table();
186 for (tables::SliceTable::RowNumber row : rows) {
187 auto relative_slice_id = row.ToRowReference(slice).id();
188 if (known_slices_.count(relative_slice_id))
189 continue;
190 known_slices_.insert(relative_slice_id);
191 slices_to_visit_.push({relative_slice_id, VisitType::VIA_RELATIVE});
192 }
193 }
194
195 std::queue<std::pair<SliceId, VisitType>> slices_to_visit_;
196 std::set<SliceId> known_slices_;
197 std::vector<tables::FlowTable::RowNumber> flow_rows_;
198
199 const TraceStorage* storage_;
200 };
201
202 } // namespace
203
ComputeTable(const std::vector<SqlValue> & arguments)204 base::StatusOr<std::unique_ptr<Table>> ConnectedFlow::ComputeTable(
205 const std::vector<SqlValue>& arguments) {
206 PERFETTO_CHECK(arguments.size() == 1);
207
208 const auto& flow = storage_->flow_table();
209 const auto& slice = storage_->slice_table();
210
211 if (arguments[0].type == SqlValue::Type::kNull) {
212 // Nothing matches a null id so return an empty table.
213 return std::unique_ptr<Table>(
214 tables::ConnectedFlowTable::SelectAndExtendParent(flow, {}, {}));
215 }
216 if (arguments[0].type != SqlValue::Type::kLong) {
217 return base::ErrStatus("start id should be an integer.");
218 }
219
220 SliceId start_id{static_cast<uint32_t>(arguments[0].AsLong())};
221 if (!slice.FindById(start_id)) {
222 return base::ErrStatus("invalid slice id %" PRIu32 "",
223 static_cast<uint32_t>(start_id.value));
224 }
225
226 BFS bfs(storage_);
227 switch (mode_) {
228 case Mode::kDirectlyConnectedFlow:
229 bfs.Start(start_id).VisitAll(VISIT_INCOMING_AND_OUTGOING,
230 VISIT_NO_RELATIVES);
231 break;
232 case Mode::kFollowingFlow:
233 bfs.Start(start_id).VisitAll(VISIT_OUTGOING, VISIT_DESCENDANTS);
234 break;
235 case Mode::kPrecedingFlow:
236 bfs.Start(start_id).VisitAll(VISIT_INCOMING, VISIT_ANCESTORS);
237 break;
238 }
239
240 std::vector<tables::FlowTable::RowNumber> result_rows =
241 std::move(bfs).TakeResultingFlows();
242
243 // Aditional column for start_id
244 ColumnStorage<uint32_t> start_ids;
245 for (size_t i = 0; i < result_rows.size(); i++) {
246 start_ids.Append(start_id.value);
247 }
248 return std::unique_ptr<Table>(
249 tables::ConnectedFlowTable::SelectAndExtendParent(flow, result_rows,
250 std::move(start_ids)));
251 }
252
CreateSchema()253 Table::Schema ConnectedFlow::CreateSchema() {
254 return tables::ConnectedFlowTable::ComputeStaticSchema();
255 }
256
TableName()257 std::string ConnectedFlow::TableName() {
258 switch (mode_) {
259 case Mode::kDirectlyConnectedFlow:
260 return "directly_connected_flow";
261 case Mode::kFollowingFlow:
262 return "following_flow";
263 case Mode::kPrecedingFlow:
264 return "preceding_flow";
265 }
266 PERFETTO_FATAL("Unexpected ConnectedFlowType");
267 }
268
EstimateRowCount()269 uint32_t ConnectedFlow::EstimateRowCount() {
270 return 1;
271 }
272 } // namespace perfetto::trace_processor
273