1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/trace_processor/perfetto_sql/intrinsics/functions/layout_functions.h"
16
17 #include <cstddef>
18 #include <cstdint>
19 #include <limits>
20 #include <queue>
21 #include <vector>
22 #include "perfetto/base/logging.h"
23 #include "perfetto/base/status.h"
24 #include "perfetto/ext/base/status_or.h"
25 #include "perfetto/trace_processor/basic_types.h"
26 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
27 #include "src/trace_processor/sqlite/bindings/sqlite_result.h"
28 #include "src/trace_processor/sqlite/bindings/sqlite_window_function.h"
29 #include "src/trace_processor/sqlite/sqlite_utils.h"
30 #include "src/trace_processor/util/status_macros.h"
31
32 namespace perfetto::trace_processor {
33
34 namespace {
35
36 constexpr char kFunctionName[] = "INTERNAL_LAYOUT";
37
38 // A helper class for tracking which depths are available at a given time
39 // and which slices are occupying each depths.
40 class SlicePacker {
41 public:
42 SlicePacker() = default;
43
44 // |dur| can be 0 for instant events and -1 for slices which do not end.
AddSlice(int64_t ts,int64_t dur)45 base::Status AddSlice(int64_t ts, int64_t dur) {
46 if (last_call_ == LastCall::kAddSlice) {
47 return base::ErrStatus(R"(
48 Incorrect window clause (observed two consecutive calls to "step" function).
49 The window clause should be "rows between unbounded preceding and current row".
50 )");
51 }
52 last_call_ = LastCall::kAddSlice;
53 if (ts < last_seen_ts_) {
54 return base::ErrStatus(R"(
55 Passed slices are in incorrect order: %s requires timestamps to be sorted.
56 Please specify "ORDER BY ts" in the window clause.
57 )",
58 kFunctionName);
59 }
60 last_seen_ts_ = ts;
61 ProcessPrecedingEvents(ts);
62 // If the event is instant, do not mark this depth as occupied as it
63 // becomes immediately available again.
64 bool is_busy = dur != 0;
65 size_t depth = SelectAvailableDepth(is_busy);
66 // If the slice has an end and is not an instant, schedule this depth
67 // to be marked available again when it ends.
68 if (dur != 0) {
69 int64_t ts_end =
70 dur == -1 ? std::numeric_limits<int64_t>::max() : ts + dur;
71 slice_ends_.push({ts_end, depth});
72 }
73 last_depth_ = depth;
74 return base::OkStatus();
75 }
76
GetLastDepth()77 size_t GetLastDepth() {
78 last_call_ = LastCall::kQuery;
79 return last_depth_;
80 }
81
82 private:
83 struct SliceEnd {
84 int64_t ts;
85 size_t depth;
86 };
87
88 struct SliceEndGreater {
operator ()perfetto::trace_processor::__anon3bbd39e10111::SlicePacker::SliceEndGreater89 bool operator()(const SliceEnd& lhs, const SliceEnd& rhs) {
90 return lhs.ts > rhs.ts;
91 }
92 };
93
ProcessPrecedingEvents(int64_t ts)94 void ProcessPrecedingEvents(int64_t ts) {
95 while (!slice_ends_.empty() && slice_ends_.top().ts <= ts) {
96 is_depth_busy_[slice_ends_.top().depth] = false;
97 slice_ends_.pop();
98 }
99 }
100
SelectAvailableDepth(bool new_state)101 size_t SelectAvailableDepth(bool new_state) {
102 for (size_t i = 0; i < is_depth_busy_.size(); ++i) {
103 if (!is_depth_busy_[i]) {
104 is_depth_busy_[i] = new_state;
105 return i;
106 }
107 }
108 size_t depth = is_depth_busy_.size();
109 is_depth_busy_.push_back(new_state);
110 return depth;
111 }
112
113 enum class LastCall {
114 kAddSlice,
115 kQuery,
116 };
117 // The first call will be "add slice" and the calls are expected to
118 // interleave, so set initial value to "query".
119 LastCall last_call_ = LastCall::kQuery;
120
121 int64_t last_seen_ts_ = 0;
122 std::vector<bool> is_depth_busy_;
123 // A list of currently open slices, ordered by end timestamp (ascending).
124 std::priority_queue<SliceEnd, std::vector<SliceEnd>, SliceEndGreater>
125 slice_ends_;
126 size_t last_depth_ = 0;
127 };
128
GetOrCreateAggregationContext(sqlite3_context * ctx)129 base::StatusOr<SlicePacker*> GetOrCreateAggregationContext(
130 sqlite3_context* ctx) {
131 auto** packer = static_cast<SlicePacker**>(
132 sqlite3_aggregate_context(ctx, sizeof(SlicePacker*)));
133 if (!packer) {
134 return base::ErrStatus("Failed to allocate aggregate context");
135 }
136
137 if (!*packer) {
138 *packer = new SlicePacker();
139 }
140 return *packer;
141 }
142
StepStatus(sqlite3_context * ctx,size_t argc,sqlite3_value ** argv)143 base::Status StepStatus(sqlite3_context* ctx,
144 size_t argc,
145 sqlite3_value** argv) {
146 base::StatusOr<SlicePacker*> slice_packer =
147 GetOrCreateAggregationContext(ctx);
148 RETURN_IF_ERROR(slice_packer.status());
149
150 ASSIGN_OR_RETURN(SqlValue ts, sqlite::utils::ExtractArgument(
151 argc, argv, "ts", 0, SqlValue::kLong));
152 if (ts.AsLong() < 0) {
153 return base::ErrStatus("ts cannot be negative.");
154 }
155
156 ASSIGN_OR_RETURN(SqlValue dur, sqlite::utils::ExtractArgument(
157 argc, argv, "dur", 1, SqlValue::kLong));
158 if (dur.AsLong() < -1) {
159 return base::ErrStatus("dur cannot be < -1.");
160 }
161
162 return slice_packer.value()->AddSlice(ts.AsLong(), dur.AsLong());
163 }
164
165 struct InternalLayout : public SqliteWindowFunction {
Stepperfetto::trace_processor::__anon3bbd39e10111::InternalLayout166 static void Step(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
167 PERFETTO_CHECK(argc >= 0);
168
169 base::Status status = StepStatus(ctx, static_cast<size_t>(argc), argv);
170 if (!status.ok()) {
171 return sqlite::utils::SetError(ctx, kFunctionName, status);
172 }
173 }
174
Inverseperfetto::trace_processor::__anon3bbd39e10111::InternalLayout175 static void Inverse(sqlite3_context* ctx, int, sqlite3_value**) {
176 sqlite::utils::SetError(ctx, kFunctionName, base::ErrStatus(R"(
177 The inverse step is not supported: the window clause should be
178 "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".
179 )"));
180 }
181
Valueperfetto::trace_processor::__anon3bbd39e10111::InternalLayout182 static void Value(sqlite3_context* ctx) {
183 base::StatusOr<SlicePacker*> slice_packer =
184 GetOrCreateAggregationContext(ctx);
185 if (!slice_packer.ok()) {
186 return sqlite::utils::SetError(ctx, kFunctionName, slice_packer.status());
187 }
188 return sqlite::result::Long(
189 ctx, static_cast<int64_t>(slice_packer.value()->GetLastDepth()));
190 }
191
Finalperfetto::trace_processor::__anon3bbd39e10111::InternalLayout192 static void Final(sqlite3_context* ctx) {
193 auto** slice_packer = static_cast<SlicePacker**>(
194 sqlite3_aggregate_context(ctx, sizeof(SlicePacker*)));
195 if (!slice_packer || !*slice_packer) {
196 return;
197 }
198 sqlite::result::Long(ctx,
199 static_cast<int64_t>((*slice_packer)->GetLastDepth()));
200 delete *slice_packer;
201 }
202 };
203
204 } // namespace
205
RegisterLayoutFunctions(PerfettoSqlEngine & engine)206 base::Status RegisterLayoutFunctions(PerfettoSqlEngine& engine) {
207 return engine.RegisterSqliteWindowFunction<InternalLayout>(kFunctionName, 2,
208 nullptr);
209 }
210
211 } // namespace perfetto::trace_processor
212