1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/trace_processor/perfetto_sql/intrinsics/functions/layout_functions.h"
16 
17 #include <cstddef>
18 #include <cstdint>
19 #include <limits>
20 #include <queue>
21 #include <vector>
22 #include "perfetto/base/logging.h"
23 #include "perfetto/base/status.h"
24 #include "perfetto/ext/base/status_or.h"
25 #include "perfetto/trace_processor/basic_types.h"
26 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
27 #include "src/trace_processor/sqlite/bindings/sqlite_result.h"
28 #include "src/trace_processor/sqlite/bindings/sqlite_window_function.h"
29 #include "src/trace_processor/sqlite/sqlite_utils.h"
30 #include "src/trace_processor/util/status_macros.h"
31 
32 namespace perfetto::trace_processor {
33 
34 namespace {
35 
36 constexpr char kFunctionName[] = "INTERNAL_LAYOUT";
37 
38 // A helper class for tracking which depths are available at a given time
39 // and which slices are occupying each depths.
40 class SlicePacker {
41  public:
42   SlicePacker() = default;
43 
44   // |dur| can be 0 for instant events and -1 for slices which do not end.
AddSlice(int64_t ts,int64_t dur)45   base::Status AddSlice(int64_t ts, int64_t dur) {
46     if (last_call_ == LastCall::kAddSlice) {
47       return base::ErrStatus(R"(
48 Incorrect window clause (observed two consecutive calls to "step" function).
49 The window clause should be "rows between unbounded preceding and current row".
50 )");
51     }
52     last_call_ = LastCall::kAddSlice;
53     if (ts < last_seen_ts_) {
54       return base::ErrStatus(R"(
55 Passed slices are in incorrect order: %s requires timestamps to be sorted.
56 Please specify "ORDER BY ts" in the window clause.
57 )",
58                              kFunctionName);
59     }
60     last_seen_ts_ = ts;
61     ProcessPrecedingEvents(ts);
62     // If the event is instant, do not mark this depth as occupied as it
63     // becomes immediately available again.
64     bool is_busy = dur != 0;
65     size_t depth = SelectAvailableDepth(is_busy);
66     // If the slice has an end and is not an instant, schedule this depth
67     // to be marked available again when it ends.
68     if (dur != 0) {
69       int64_t ts_end =
70           dur == -1 ? std::numeric_limits<int64_t>::max() : ts + dur;
71       slice_ends_.push({ts_end, depth});
72     }
73     last_depth_ = depth;
74     return base::OkStatus();
75   }
76 
GetLastDepth()77   size_t GetLastDepth() {
78     last_call_ = LastCall::kQuery;
79     return last_depth_;
80   }
81 
82  private:
83   struct SliceEnd {
84     int64_t ts;
85     size_t depth;
86   };
87 
88   struct SliceEndGreater {
operator ()perfetto::trace_processor::__anon3bbd39e10111::SlicePacker::SliceEndGreater89     bool operator()(const SliceEnd& lhs, const SliceEnd& rhs) {
90       return lhs.ts > rhs.ts;
91     }
92   };
93 
ProcessPrecedingEvents(int64_t ts)94   void ProcessPrecedingEvents(int64_t ts) {
95     while (!slice_ends_.empty() && slice_ends_.top().ts <= ts) {
96       is_depth_busy_[slice_ends_.top().depth] = false;
97       slice_ends_.pop();
98     }
99   }
100 
SelectAvailableDepth(bool new_state)101   size_t SelectAvailableDepth(bool new_state) {
102     for (size_t i = 0; i < is_depth_busy_.size(); ++i) {
103       if (!is_depth_busy_[i]) {
104         is_depth_busy_[i] = new_state;
105         return i;
106       }
107     }
108     size_t depth = is_depth_busy_.size();
109     is_depth_busy_.push_back(new_state);
110     return depth;
111   }
112 
113   enum class LastCall {
114     kAddSlice,
115     kQuery,
116   };
117   // The first call will be "add slice" and the calls are expected to
118   // interleave, so set initial value to "query".
119   LastCall last_call_ = LastCall::kQuery;
120 
121   int64_t last_seen_ts_ = 0;
122   std::vector<bool> is_depth_busy_;
123   // A list of currently open slices, ordered by end timestamp (ascending).
124   std::priority_queue<SliceEnd, std::vector<SliceEnd>, SliceEndGreater>
125       slice_ends_;
126   size_t last_depth_ = 0;
127 };
128 
GetOrCreateAggregationContext(sqlite3_context * ctx)129 base::StatusOr<SlicePacker*> GetOrCreateAggregationContext(
130     sqlite3_context* ctx) {
131   auto** packer = static_cast<SlicePacker**>(
132       sqlite3_aggregate_context(ctx, sizeof(SlicePacker*)));
133   if (!packer) {
134     return base::ErrStatus("Failed to allocate aggregate context");
135   }
136 
137   if (!*packer) {
138     *packer = new SlicePacker();
139   }
140   return *packer;
141 }
142 
StepStatus(sqlite3_context * ctx,size_t argc,sqlite3_value ** argv)143 base::Status StepStatus(sqlite3_context* ctx,
144                         size_t argc,
145                         sqlite3_value** argv) {
146   base::StatusOr<SlicePacker*> slice_packer =
147       GetOrCreateAggregationContext(ctx);
148   RETURN_IF_ERROR(slice_packer.status());
149 
150   ASSIGN_OR_RETURN(SqlValue ts, sqlite::utils::ExtractArgument(
151                                     argc, argv, "ts", 0, SqlValue::kLong));
152   if (ts.AsLong() < 0) {
153     return base::ErrStatus("ts cannot be negative.");
154   }
155 
156   ASSIGN_OR_RETURN(SqlValue dur, sqlite::utils::ExtractArgument(
157                                      argc, argv, "dur", 1, SqlValue::kLong));
158   if (dur.AsLong() < -1) {
159     return base::ErrStatus("dur cannot be < -1.");
160   }
161 
162   return slice_packer.value()->AddSlice(ts.AsLong(), dur.AsLong());
163 }
164 
165 struct InternalLayout : public SqliteWindowFunction {
Stepperfetto::trace_processor::__anon3bbd39e10111::InternalLayout166   static void Step(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
167     PERFETTO_CHECK(argc >= 0);
168 
169     base::Status status = StepStatus(ctx, static_cast<size_t>(argc), argv);
170     if (!status.ok()) {
171       return sqlite::utils::SetError(ctx, kFunctionName, status);
172     }
173   }
174 
Inverseperfetto::trace_processor::__anon3bbd39e10111::InternalLayout175   static void Inverse(sqlite3_context* ctx, int, sqlite3_value**) {
176     sqlite::utils::SetError(ctx, kFunctionName, base::ErrStatus(R"(
177 The inverse step is not supported: the window clause should be
178 "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".
179 )"));
180   }
181 
Valueperfetto::trace_processor::__anon3bbd39e10111::InternalLayout182   static void Value(sqlite3_context* ctx) {
183     base::StatusOr<SlicePacker*> slice_packer =
184         GetOrCreateAggregationContext(ctx);
185     if (!slice_packer.ok()) {
186       return sqlite::utils::SetError(ctx, kFunctionName, slice_packer.status());
187     }
188     return sqlite::result::Long(
189         ctx, static_cast<int64_t>(slice_packer.value()->GetLastDepth()));
190   }
191 
Finalperfetto::trace_processor::__anon3bbd39e10111::InternalLayout192   static void Final(sqlite3_context* ctx) {
193     auto** slice_packer = static_cast<SlicePacker**>(
194         sqlite3_aggregate_context(ctx, sizeof(SlicePacker*)));
195     if (!slice_packer || !*slice_packer) {
196       return;
197     }
198     sqlite::result::Long(ctx,
199                          static_cast<int64_t>((*slice_packer)->GetLastDepth()));
200     delete *slice_packer;
201   }
202 };
203 
204 }  // namespace
205 
RegisterLayoutFunctions(PerfettoSqlEngine & engine)206 base::Status RegisterLayoutFunctions(PerfettoSqlEngine& engine) {
207   return engine.RegisterSqliteWindowFunction<InternalLayout>(kFunctionName, 2,
208                                                              nullptr);
209 }
210 
211 }  // namespace perfetto::trace_processor
212