1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/perfetto_sql/intrinsics/operators/span_join_operator.h"
18
19 #include <sqlite3.h>
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstring>
23 #include <limits>
24 #include <memory>
25 #include <optional>
26 #include <string>
27 #include <tuple>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31
32 #include "perfetto/base/compiler.h"
33 #include "perfetto/base/logging.h"
34 #include "perfetto/base/status.h"
35 #include "perfetto/ext/base/string_splitter.h"
36 #include "perfetto/ext/base/string_utils.h"
37 #include "perfetto/trace_processor/basic_types.h"
38 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
39 #include "src/trace_processor/sqlite/bindings/sqlite_result.h"
40 #include "src/trace_processor/sqlite/module_lifecycle_manager.h"
41 #include "src/trace_processor/sqlite/sql_source.h"
42 #include "src/trace_processor/sqlite/sqlite_utils.h"
43 #include "src/trace_processor/tp_metatrace.h"
44 #include "src/trace_processor/util/status_macros.h"
45
46 #include "protos/perfetto/trace_processor/metatrace_categories.pbzero.h"
47
48 namespace perfetto::trace_processor {
49
50 namespace {
51
52 constexpr char kTsColumnName[] = "ts";
53 constexpr char kDurColumnName[] = "dur";
54
IsRequiredColumn(const std::string & name)55 bool IsRequiredColumn(const std::string& name) {
56 return name == kTsColumnName || name == kDurColumnName;
57 }
58
HasDuplicateColumns(const std::vector<std::pair<SqlValue::Type,std::string>> & t1,const std::vector<std::pair<SqlValue::Type,std::string>> & t2,const std::optional<std::string> & partition_col)59 std::optional<std::string> HasDuplicateColumns(
60 const std::vector<std::pair<SqlValue::Type, std::string>>& t1,
61 const std::vector<std::pair<SqlValue::Type, std::string>>& t2,
62 const std::optional<std::string>& partition_col) {
63 std::unordered_set<std::string> seen_names;
64 for (const auto& col : t1) {
65 if (IsRequiredColumn(col.second) || col.second == partition_col) {
66 continue;
67 }
68 if (seen_names.count(col.second) > 0) {
69 return col.second;
70 }
71 seen_names.insert(col.second);
72 }
73 for (const auto& col : t2) {
74 if (IsRequiredColumn(col.second) || col.second == partition_col) {
75 continue;
76 }
77 if (seen_names.count(col.second) > 0) {
78 return col.second;
79 }
80 seen_names.insert(col.second);
81 }
82 return std::nullopt;
83 }
84
OpToString(int op)85 std::string OpToString(int op) {
86 switch (op) {
87 case SQLITE_INDEX_CONSTRAINT_EQ:
88 return "=";
89 case SQLITE_INDEX_CONSTRAINT_NE:
90 return "!=";
91 case SQLITE_INDEX_CONSTRAINT_GE:
92 return ">=";
93 case SQLITE_INDEX_CONSTRAINT_GT:
94 return ">";
95 case SQLITE_INDEX_CONSTRAINT_LE:
96 return "<=";
97 case SQLITE_INDEX_CONSTRAINT_LT:
98 return "<";
99 case SQLITE_INDEX_CONSTRAINT_LIKE:
100 return " like ";
101 case SQLITE_INDEX_CONSTRAINT_GLOB:
102 return " glob ";
103 case SQLITE_INDEX_CONSTRAINT_ISNULL:
104 // The "null" will be added below in EscapedSqliteValueAsString.
105 return " is ";
106 case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
107 // The "null" will be added below in EscapedSqliteValueAsString.
108 return " is not ";
109 default:
110 PERFETTO_FATAL("Operator to string conversion not impemented for %d", op);
111 }
112 }
113
EscapedSqliteValueAsString(sqlite3_value * value)114 std::string EscapedSqliteValueAsString(sqlite3_value* value) {
115 switch (sqlite3_value_type(value)) {
116 case SQLITE_INTEGER:
117 return std::to_string(sqlite3_value_int64(value));
118 case SQLITE_FLOAT:
119 return std::to_string(sqlite3_value_double(value));
120 case SQLITE_TEXT: {
121 // If str itself contains a single quote, we need to escape it with
122 // another single quote.
123 const char* str =
124 reinterpret_cast<const char*>(sqlite3_value_text(value));
125 return "'" + base::ReplaceAll(str, "'", "''") + "'";
126 }
127 case SQLITE_NULL:
128 return " null";
129 default:
130 PERFETTO_FATAL("Unknown value type %d", sqlite3_value_type(value));
131 }
132 }
133
134 } // namespace
135
PopulateColumnLocatorMap(uint32_t offset)136 void SpanJoinOperatorModule::State::PopulateColumnLocatorMap(uint32_t offset) {
137 for (uint32_t i = 0; i < t1_defn.columns().size(); ++i) {
138 if (i == t1_defn.ts_idx() || i == t1_defn.dur_idx() ||
139 i == t1_defn.partition_idx()) {
140 continue;
141 }
142 ColumnLocator* locator = &global_index_to_column_locator[offset++];
143 locator->defn = &t1_defn;
144 locator->col_index = i;
145 }
146 for (uint32_t i = 0; i < t2_defn.columns().size(); ++i) {
147 if (i == t2_defn.ts_idx() || i == t2_defn.dur_idx() ||
148 i == t2_defn.partition_idx()) {
149 continue;
150 }
151 ColumnLocator* locator = &global_index_to_column_locator[offset++];
152 locator->defn = &t2_defn;
153 locator->col_index = i;
154 }
155 }
156
BestIndexStrForDefinition(const sqlite3_index_info * info,const TableDefinition & defn)157 std::string SpanJoinOperatorModule::State::BestIndexStrForDefinition(
158 const sqlite3_index_info* info,
159 const TableDefinition& defn) {
160 uint32_t count = 0;
161 std::string constraints;
162 for (int i = 0; i < info->nConstraint; i++) {
163 const auto& c = info->aConstraint[i];
164 if (!c.usable) {
165 continue;
166 }
167
168 auto col_name = GetNameForGlobalColumnIndex(defn, c.iColumn);
169 if (col_name.empty()) {
170 continue;
171 }
172
173 // Le constraints can be passed straight to the child tables as they won't
174 // affect the span join computation. Similarily, source_geq constraints
175 // explicitly request that they are passed as geq constraints to the source
176 // tables.
177 if (col_name == kTsColumnName && !sqlite::utils::IsOpLe(c.op) &&
178 c.op != kSourceGeqOpCode) {
179 continue;
180 }
181
182 // Allow SQLite handle any constraints on duration apart from source_geq
183 // constraints.
184 if (col_name == kDurColumnName && c.op != kSourceGeqOpCode) {
185 continue;
186 }
187
188 // If we're emitting shadow slices, don't propogate any constraints
189 // on this table as this will break the shadow slice computation.
190 if (defn.ShouldEmitPresentPartitionShadow()) {
191 continue;
192 }
193
194 PERFETTO_DCHECK(info->aConstraintUsage[i].argvIndex > 0);
195 std::string argvIndex =
196 std::to_string(info->aConstraintUsage[i].argvIndex - 1);
197 std::string op = OpToString(
198 c.op == kSourceGeqOpCode ? SQLITE_INDEX_CONSTRAINT_GE : c.op);
199 constraints += "," + argvIndex + "," + "`" + col_name + "`" + op;
200 count++;
201 }
202 return std::to_string(count) + constraints;
203 }
204
Create(PerfettoSqlEngine * engine,const TableDescriptor & desc,EmitShadowType emit_shadow_type,TableDefinition * defn)205 base::Status SpanJoinOperatorModule::TableDefinition::Create(
206 PerfettoSqlEngine* engine,
207 const TableDescriptor& desc,
208 EmitShadowType emit_shadow_type,
209 TableDefinition* defn) {
210 if (desc.partition_col == kTsColumnName ||
211 desc.partition_col == kDurColumnName) {
212 return base::ErrStatus(
213 "SPAN_JOIN: partition column cannot be any of {ts, dur} for table %s",
214 desc.name.c_str());
215 }
216
217 std::vector<std::pair<SqlValue::Type, std::string>> cols;
218 RETURN_IF_ERROR(sqlite::utils::GetColumnsForTable(
219 engine->sqlite_engine()->db(), desc.name, cols));
220
221 uint32_t required_columns_found = 0;
222 uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
223 uint32_t dur_idx = std::numeric_limits<uint32_t>::max();
224 uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
225 for (uint32_t i = 0; i < cols.size(); i++) {
226 auto col = cols[i];
227 if (IsRequiredColumn(col.second)) {
228 ++required_columns_found;
229 }
230 if (base::Contains(col.second, ",")) {
231 return base::ErrStatus("SPAN_JOIN: column '%s' cannot contain any ','",
232 col.second.c_str());
233 }
234 if (base::Contains(col.second, ':')) {
235 return base::ErrStatus("SPAN_JOIN: column '%s' cannot contain any ':'",
236 col.second.c_str());
237 }
238
239 if (col.second == kTsColumnName) {
240 ts_idx = i;
241 } else if (col.second == kDurColumnName) {
242 dur_idx = i;
243 } else if (col.second == desc.partition_col) {
244 partition_idx = i;
245 }
246 }
247 if (required_columns_found != 2) {
248 return base::ErrStatus(
249 "SPAN_JOIN: Missing one of columns {ts, dur} in table %s",
250 desc.name.c_str());
251 }
252 if (desc.IsPartitioned() && partition_idx >= cols.size()) {
253 return base::ErrStatus(
254 "SPAN_JOIN: Missing partition column '%s' in table '%s'",
255 desc.partition_col.c_str(), desc.name.c_str());
256 }
257
258 PERFETTO_DCHECK(ts_idx < cols.size());
259 PERFETTO_DCHECK(dur_idx < cols.size());
260
261 *defn = TableDefinition(desc.name, desc.partition_col, std::move(cols),
262 emit_shadow_type, ts_idx, dur_idx, partition_idx);
263 return base::OkStatus();
264 }
265
266 std::string
CreateVtabCreateTableSection() const267 SpanJoinOperatorModule::TableDefinition::CreateVtabCreateTableSection() const {
268 std::string cols;
269 for (const auto& col : columns()) {
270 if (IsRequiredColumn(col.second) || col.second == partition_col()) {
271 continue;
272 }
273 if (col.first == SqlValue::Type::kNull) {
274 cols += col.second + ",";
275 } else {
276 cols += col.second + " " +
277 sqlite::utils::SqlValueTypeToSqliteTypeName(col.first) + ",";
278 }
279 }
280 return cols;
281 }
282
GetNameForGlobalColumnIndex(const TableDefinition & defn,int global_column)283 std::string SpanJoinOperatorModule::State::GetNameForGlobalColumnIndex(
284 const TableDefinition& defn,
285 int global_column) {
286 auto col_idx = static_cast<size_t>(global_column);
287 if (col_idx == Column::kTimestamp) {
288 return kTsColumnName;
289 }
290 if (col_idx == Column::kDuration) {
291 return kDurColumnName;
292 }
293 if (col_idx == Column::kPartition &&
294 partitioning != PartitioningType::kNoPartitioning) {
295 return defn.partition_col();
296 }
297
298 const auto& locator = global_index_to_column_locator[col_idx];
299 if (locator.defn != &defn) {
300 return "";
301 }
302 return defn.columns()[locator.col_index].second;
303 }
304
Query(SpanJoinOperatorModule::State * state,const TableDefinition * definition)305 SpanJoinOperatorModule::Query::Query(SpanJoinOperatorModule::State* state,
306 const TableDefinition* definition)
307 : defn_(definition), in_state_(state) {
308 PERFETTO_DCHECK(!defn_->IsPartitioned() ||
309 defn_->partition_idx() < defn_->columns().size());
310 }
311
312 SpanJoinOperatorModule::Query::~Query() = default;
313
Initialize(std::string sql_query,InitialEofBehavior eof_behavior)314 base::Status SpanJoinOperatorModule::Query::Initialize(
315 std::string sql_query,
316 InitialEofBehavior eof_behavior) {
317 *this = Query(in_state_, definition());
318 sql_query_ = std::move(sql_query);
319 base::Status status = Rewind();
320 if (!status.ok())
321 return status;
322 if (eof_behavior == InitialEofBehavior::kTreatAsMissingPartitionShadow &&
323 IsEof()) {
324 state_ = State::kMissingPartitionShadow;
325 }
326 return status;
327 }
328
Next()329 base::Status SpanJoinOperatorModule::Query::Next() {
330 RETURN_IF_ERROR(NextSliceState());
331 return FindNextValidSlice();
332 }
333
IsValidSlice()334 bool SpanJoinOperatorModule::Query::IsValidSlice() {
335 // Disallow any single partition shadow slices if the definition doesn't allow
336 // them.
337 if (IsPresentPartitionShadow() && !defn_->ShouldEmitPresentPartitionShadow())
338 return false;
339
340 // Disallow any missing partition shadow slices if the definition doesn't
341 // allow them.
342 if (IsMissingPartitionShadow() && !defn_->ShouldEmitMissingPartitionShadow())
343 return false;
344
345 // Disallow any "empty" shadows; these are shadows which either have the same
346 // start and end time or missing-partition shadows which have the same start
347 // and end partition.
348 if (IsEmptyShadow())
349 return false;
350
351 return true;
352 }
353
FindNextValidSlice()354 base::Status SpanJoinOperatorModule::Query::FindNextValidSlice() {
355 // The basic idea of this function is that |NextSliceState()| always emits
356 // all possible slices (including shadows for any gaps inbetween the real
357 // slices) and we filter out the invalid slices (as defined by the table
358 // definition) using |IsValidSlice()|.
359 //
360 // This has proved to be a lot cleaner to implement than trying to choose
361 // when to emit and not emit shadows directly.
362 while (!IsEof() && !IsValidSlice()) {
363 RETURN_IF_ERROR(NextSliceState());
364 }
365 return base::OkStatus();
366 }
367
NextSliceState()368 base::Status SpanJoinOperatorModule::Query::NextSliceState() {
369 switch (state_) {
370 case State::kReal: {
371 // Forward the cursor to figure out where the next slice should be.
372 RETURN_IF_ERROR(CursorNext());
373
374 // Depending on the next slice, we can do two things here:
375 // 1. If the next slice is on the same partition, we can just emit a
376 // single shadow until the start of the next slice.
377 // 2. If the next slice is on another partition or we hit eof, just emit
378 // a shadow to the end of the whole partition.
379 bool shadow_to_end = cursor_eof_ || (defn_->IsPartitioned() &&
380 partition_ != CursorPartition());
381 state_ = State::kPresentPartitionShadow;
382 ts_ = AdjustedTsEnd();
383 ts_end_ =
384 shadow_to_end ? std::numeric_limits<int64_t>::max() : CursorTs();
385 return base::OkStatus();
386 }
387 case State::kPresentPartitionShadow: {
388 if (ts_end_ == std::numeric_limits<int64_t>::max()) {
389 // If the shadow is to the end of the slice, create a missing partition
390 // shadow to the start of the partition of the next slice or to the max
391 // partition if we hit eof.
392 state_ = State::kMissingPartitionShadow;
393 ts_ = 0;
394 ts_end_ = std::numeric_limits<int64_t>::max();
395
396 missing_partition_start_ = partition_ + 1;
397 missing_partition_end_ = cursor_eof_
398 ? std::numeric_limits<int64_t>::max()
399 : CursorPartition();
400 } else {
401 // If the shadow is not to the end, we must have another slice on the
402 // current partition.
403 state_ = State::kReal;
404 ts_ = CursorTs();
405 ts_end_ = ts_ + CursorDur();
406
407 PERFETTO_DCHECK(!defn_->IsPartitioned() ||
408 partition_ == CursorPartition());
409 }
410 return base::OkStatus();
411 }
412 case State::kMissingPartitionShadow: {
413 if (missing_partition_end_ == std::numeric_limits<int64_t>::max()) {
414 PERFETTO_DCHECK(cursor_eof_);
415
416 // If we have a missing partition to the max partition, we must have hit
417 // eof.
418 state_ = State::kEof;
419 } else {
420 PERFETTO_DCHECK(!defn_->IsPartitioned() ||
421 CursorPartition() == missing_partition_end_);
422
423 // Otherwise, setup a single partition slice on the end partition to the
424 // start of the next slice.
425 state_ = State::kPresentPartitionShadow;
426 ts_ = 0;
427 ts_end_ = CursorTs();
428 partition_ = missing_partition_end_;
429 }
430 return base::OkStatus();
431 }
432 case State::kEof: {
433 PERFETTO_DFATAL("Called Next when EOF");
434 return base::ErrStatus("Called Next when EOF");
435 }
436 }
437 PERFETTO_FATAL("For GCC");
438 }
439
Rewind()440 base::Status SpanJoinOperatorModule::Query::Rewind() {
441 auto res = in_state_->engine->sqlite_engine()->PrepareStatement(
442 SqlSource::FromTraceProcessorImplementation(sql_query_));
443 cursor_eof_ = false;
444 RETURN_IF_ERROR(res.status());
445 stmt_ = std::move(res);
446
447 RETURN_IF_ERROR(CursorNext());
448
449 // Setup the first slice as a missing partition shadow from the lowest
450 // partition until the first slice partition. We will handle finding the real
451 // slice in |FindNextValidSlice()|.
452 state_ = State::kMissingPartitionShadow;
453 ts_ = 0;
454 ts_end_ = std::numeric_limits<int64_t>::max();
455 missing_partition_start_ = std::numeric_limits<int64_t>::min();
456
457 if (cursor_eof_) {
458 missing_partition_end_ = std::numeric_limits<int64_t>::max();
459 } else if (defn_->IsPartitioned()) {
460 missing_partition_end_ = CursorPartition();
461 } else {
462 missing_partition_end_ = std::numeric_limits<int64_t>::min();
463 }
464
465 // Actually compute the first valid slice.
466 return FindNextValidSlice();
467 }
468
CursorNext()469 base::Status SpanJoinOperatorModule::Query::CursorNext() {
470 if (defn_->IsPartitioned()) {
471 auto partition_idx = static_cast<int>(defn_->partition_idx());
472 // Fastforward through any rows with null partition keys.
473 int row_type;
474 do {
475 cursor_eof_ = !stmt_->Step();
476 RETURN_IF_ERROR(stmt_->status());
477 row_type = sqlite3_column_type(stmt_->sqlite_stmt(), partition_idx);
478 } while (!cursor_eof_ && row_type == SQLITE_NULL);
479
480 if (!cursor_eof_ && row_type != SQLITE_INTEGER) {
481 return base::ErrStatus("SPAN_JOIN: partition is not an INT column");
482 }
483 } else {
484 cursor_eof_ = !stmt_->Step();
485 }
486 return base::OkStatus();
487 }
488
ReportSqliteResult(sqlite3_context * context,size_t index)489 void SpanJoinOperatorModule::Query::ReportSqliteResult(sqlite3_context* context,
490 size_t index) {
491 if (state_ != State::kReal) {
492 return sqlite::result::Null(context);
493 }
494
495 sqlite3_stmt* stmt = stmt_->sqlite_stmt();
496 int idx = static_cast<int>(index);
497 switch (sqlite3_column_type(stmt, idx)) {
498 case SQLITE_INTEGER:
499 return sqlite::result::Long(context, sqlite3_column_int64(stmt, idx));
500 case SQLITE_FLOAT:
501 return sqlite::result::Double(context, sqlite3_column_double(stmt, idx));
502 case SQLITE_TEXT: {
503 // TODO(lalitm): note for future optimizations: if we knew the addresses
504 // of the string intern pool, we could check if the string returned here
505 // comes from the pool, and pass it as non-transient.
506 const auto* ptr =
507 reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
508 return sqlite::result::TransientString(context, ptr);
509 }
510 case SQLITE_BLOB: {
511 return sqlite::result::TransientBytes(context,
512 sqlite3_column_blob(stmt, idx),
513 sqlite3_column_bytes(stmt, idx));
514 }
515 }
516 }
517
TableDefinition(std::string name,std::string partition_col,std::vector<std::pair<SqlValue::Type,std::string>> cols,EmitShadowType emit_shadow_type,uint32_t ts_idx,uint32_t dur_idx,uint32_t partition_idx)518 SpanJoinOperatorModule::TableDefinition::TableDefinition(
519 std::string name,
520 std::string partition_col,
521 std::vector<std::pair<SqlValue::Type, std::string>> cols,
522 EmitShadowType emit_shadow_type,
523 uint32_t ts_idx,
524 uint32_t dur_idx,
525 uint32_t partition_idx)
526 : emit_shadow_type_(emit_shadow_type),
527 name_(std::move(name)),
528 partition_col_(std::move(partition_col)),
529 cols_(std::move(cols)),
530 ts_idx_(ts_idx),
531 dur_idx_(dur_idx),
532 partition_idx_(partition_idx) {}
533
Parse(const std::string & raw_descriptor,TableDescriptor * descriptor)534 base::Status SpanJoinOperatorModule::TableDescriptor::Parse(
535 const std::string& raw_descriptor,
536 TableDescriptor* descriptor) {
537 // Descriptors have one of the following forms:
538 // table_name [PARTITIONED column_name]
539
540 // Find the table name.
541 base::StringSplitter splitter(raw_descriptor, ' ');
542 if (!splitter.Next())
543 return base::ErrStatus("SPAN_JOIN: Missing table name");
544
545 descriptor->name = splitter.cur_token();
546 if (!splitter.Next())
547 return base::OkStatus();
548
549 if (!base::CaseInsensitiveEqual(splitter.cur_token(), "PARTITIONED"))
550 return base::ErrStatus("SPAN_JOIN: Invalid token");
551
552 if (!splitter.Next())
553 return base::ErrStatus("SPAN_JOIN: Missing partitioning column");
554
555 descriptor->partition_col = splitter.cur_token();
556 return base::OkStatus();
557 }
558
CreateSqlQuery(base::StringSplitter & idx,sqlite3_value ** argv) const559 std::string SpanJoinOperatorModule::TableDefinition::CreateSqlQuery(
560 base::StringSplitter& idx,
561 sqlite3_value** argv) const {
562 std::vector<std::string> col_names;
563 for (const auto& c : columns()) {
564 col_names.push_back("`" + c.second + "`");
565 }
566
567 PERFETTO_CHECK(idx.Next());
568 std::optional<uint32_t> cs_count = base::StringToUInt32(idx.cur_token());
569 PERFETTO_CHECK(cs_count);
570 std::vector<std::string> cs;
571 cs.reserve(*cs_count);
572 for (uint32_t i = 0; i < *cs_count; ++i) {
573 PERFETTO_CHECK(idx.Next());
574 std::optional<uint32_t> argv_idx = base::StringToUInt32(idx.cur_token());
575 PERFETTO_CHECK(argv_idx);
576
577 PERFETTO_CHECK(idx.Next());
578 cs.emplace_back(idx.cur_token() +
579 EscapedSqliteValueAsString(argv[*argv_idx]));
580 }
581
582 std::string sql = "SELECT " + base::Join(col_names, ", ");
583 sql += " FROM " + name();
584 if (!cs.empty()) {
585 sql += " WHERE " + base::Join(cs, " AND ");
586 }
587 sql += " ORDER BY ";
588 sql += IsPartitioned() ? base::Join({"`" + partition_col() + "`", "ts"}, ", ")
589 : "ts";
590 sql += ";";
591 return sql;
592 }
593
Create(sqlite3 * db,void * ctx,int argc,const char * const * argv,sqlite3_vtab ** vtab,char ** pzErr)594 int SpanJoinOperatorModule::Create(sqlite3* db,
595 void* ctx,
596 int argc,
597 const char* const* argv,
598 sqlite3_vtab** vtab,
599 char** pzErr) {
600 // argv[0] - argv[2] are SQLite populated fields which are always present.
601 if (argc < 5) {
602 *pzErr = sqlite3_mprintf("SPAN_JOIN: expected at least 2 args");
603 return SQLITE_ERROR;
604 }
605
606 auto* context = GetContext(ctx);
607 auto state = std::make_unique<State>();
608 state->engine = context->engine;
609 state->module_name = argv[0];
610
611 TableDescriptor t1_desc;
612 auto status = TableDescriptor::Parse(
613 std::string(reinterpret_cast<const char*>(argv[3])), &t1_desc);
614 if (!status.ok()) {
615 *pzErr = sqlite3_mprintf("%s", status.c_message());
616 return SQLITE_ERROR;
617 }
618
619 TableDescriptor t2_desc;
620 status = TableDescriptor::Parse(
621 std::string(reinterpret_cast<const char*>(argv[4])), &t2_desc);
622 if (!status.ok()) {
623 *pzErr = sqlite3_mprintf("%s", status.c_message());
624 return SQLITE_ERROR;
625 }
626
627 // Check that the partition columns match between the two tables.
628 if (t1_desc.partition_col == t2_desc.partition_col) {
629 state->partitioning = t1_desc.IsPartitioned()
630 ? PartitioningType::kSamePartitioning
631 : PartitioningType::kNoPartitioning;
632 } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
633 *pzErr = sqlite3_mprintf(
634 "SPAN_JOIN: mismatching partitions between the two tables; "
635 "(partition %s in table %s, partition %s in table %s)",
636 t1_desc.partition_col.c_str(), t1_desc.name.c_str(),
637 t2_desc.partition_col.c_str(), t2_desc.name.c_str());
638 return SQLITE_ERROR;
639 } else {
640 state->partitioning = PartitioningType::kMixedPartitioning;
641 }
642
643 bool t1_part_mixed =
644 t1_desc.IsPartitioned() &&
645 state->partitioning == PartitioningType::kMixedPartitioning;
646 bool t2_part_mixed =
647 t2_desc.IsPartitioned() &&
648 state->partitioning == PartitioningType::kMixedPartitioning;
649
650 EmitShadowType t1_shadow_type;
651 if (state->IsOuterJoin()) {
652 if (t1_part_mixed ||
653 state->partitioning == PartitioningType::kNoPartitioning) {
654 t1_shadow_type = EmitShadowType::kPresentPartitionOnly;
655 } else {
656 t1_shadow_type = EmitShadowType::kAll;
657 }
658 } else {
659 t1_shadow_type = EmitShadowType::kNone;
660 }
661 status = TableDefinition::Create(state->engine, t1_desc, t1_shadow_type,
662 &state->t1_defn);
663 if (!status.ok()) {
664 *pzErr = sqlite3_mprintf("%s", status.c_message());
665 return SQLITE_ERROR;
666 }
667
668 EmitShadowType t2_shadow_type;
669 if (state->IsOuterJoin() || state->IsLeftJoin()) {
670 if (t2_part_mixed ||
671 state->partitioning == PartitioningType::kNoPartitioning) {
672 t2_shadow_type = EmitShadowType::kPresentPartitionOnly;
673 } else {
674 t2_shadow_type = EmitShadowType::kAll;
675 }
676 } else {
677 t2_shadow_type = EmitShadowType::kNone;
678 }
679 status = TableDefinition::Create(state->engine, t2_desc, t2_shadow_type,
680 &state->t2_defn);
681 if (!status.ok()) {
682 *pzErr = sqlite3_mprintf("%s", status.c_message());
683 return SQLITE_ERROR;
684 }
685
686 if (auto dupe = HasDuplicateColumns(
687 state->t1_defn.columns(), state->t2_defn.columns(),
688 state->partitioning == PartitioningType::kNoPartitioning
689 ? std::nullopt
690 : std::make_optional(state->partition_col()))) {
691 *pzErr = sqlite3_mprintf(
692 "SPAN_JOIN: column %s present in both tables %s and %s", dupe->c_str(),
693 state->t1_defn.name().c_str(), state->t2_defn.name().c_str());
694 return SQLITE_ERROR;
695 }
696
697 // Create the map from column index to the column in the child sub-queries.
698 state->PopulateColumnLocatorMap(
699 state->partitioning == PartitioningType::kNoPartitioning ? 2 : 3);
700
701 std::string primary_key = "ts";
702 std::string partition;
703 if (state->partitioning != PartitioningType::kNoPartitioning) {
704 partition = state->partition_col() + " BIGINT,";
705 primary_key += ", " + state->partition_col();
706 }
707 std::string t1_section = state->t1_defn.CreateVtabCreateTableSection();
708 std::string t2_section = state->t2_defn.CreateVtabCreateTableSection();
709 static constexpr char kStmt[] = R"(
710 CREATE TABLE x(
711 ts BIGINT,
712 dur BIGINT,
713 %s
714 %s
715 %s
716 PRIMARY KEY(%s)
717 )
718 )";
719 base::StackString<1024> create_table_str(
720 kStmt, partition.c_str(), t1_section.c_str(), t2_section.c_str(),
721 primary_key.c_str());
722 state->create_table_stmt = create_table_str.ToStdString();
723 if (int ret = sqlite3_declare_vtab(db, create_table_str.c_str());
724 ret != SQLITE_OK) {
725 return ret;
726 }
727
728 std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
729 res->state = context->manager.OnCreate(argv, std::move(state));
730 *vtab = res.release();
731 return SQLITE_OK;
732 }
733
Destroy(sqlite3_vtab * vtab)734 int SpanJoinOperatorModule::Destroy(sqlite3_vtab* vtab) {
735 std::unique_ptr<Vtab> tab(GetVtab(vtab));
736 sqlite::ModuleStateManager<SpanJoinOperatorModule>::OnDestroy(tab->state);
737 return SQLITE_OK;
738 }
739
Connect(sqlite3 * db,void * ctx,int,const char * const * argv,sqlite3_vtab ** vtab,char **)740 int SpanJoinOperatorModule::Connect(sqlite3* db,
741 void* ctx,
742 int,
743 const char* const* argv,
744 sqlite3_vtab** vtab,
745 char**) {
746 auto* context = GetContext(ctx);
747 std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
748 res->state = context->manager.OnConnect(argv);
749
750 auto* state =
751 sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(res->state);
752 if (int ret = sqlite3_declare_vtab(db, state->create_table_stmt.c_str());
753 ret != SQLITE_OK) {
754 return ret;
755 }
756 *vtab = res.release();
757 return SQLITE_OK;
758 }
759
Disconnect(sqlite3_vtab * vtab)760 int SpanJoinOperatorModule::Disconnect(sqlite3_vtab* vtab) {
761 std::unique_ptr<Vtab> tab(GetVtab(vtab));
762 sqlite::ModuleStateManager<SpanJoinOperatorModule>::OnDisconnect(tab->state);
763 return SQLITE_OK;
764 }
765
BestIndex(sqlite3_vtab * tab,sqlite3_index_info * info)766 int SpanJoinOperatorModule::BestIndex(sqlite3_vtab* tab,
767 sqlite3_index_info* info) {
768 int argvIndex = 1;
769 for (int i = 0; i < info->nConstraint; ++i) {
770 if (!info->aConstraint[i].usable) {
771 continue;
772 }
773 info->aConstraintUsage[i].argvIndex = argvIndex++;
774 }
775
776 Vtab* table = GetVtab(tab);
777 State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
778 table->state);
779 if (state->partitioning == PartitioningType::kNoPartitioning) {
780 // If both tables are not partitioned and we have a single order by on ts,
781 // we return data in the correct order.
782 info->orderByConsumed = info->nOrderBy == 1 &&
783 info->aOrderBy[0].iColumn == Column::kTimestamp &&
784 !info->aOrderBy[0].desc;
785 } else {
786 // If one of the tables is partitioned, and we have an order by on the
787 // partition column followed (optionally) by an order by on timestamp, we
788 // return data in the correct order.
789 bool is_first_ob_partition =
790 info->nOrderBy > 0 && info->aOrderBy[0].iColumn == Column::kPartition &&
791 !info->aOrderBy[0].desc;
792 bool is_second_ob_ts = info->nOrderBy >= 2 &&
793 info->aOrderBy[1].iColumn == Column::kTimestamp &&
794 !info->aOrderBy[1].desc;
795 info->orderByConsumed =
796 (info->nOrderBy == 1 && is_first_ob_partition) ||
797 (info->nOrderBy == 2 && is_first_ob_partition && is_second_ob_ts);
798 }
799
800 for (int i = 0; i < info->nConstraint; ++i) {
801 if (info->aConstraint[i].op == kSourceGeqOpCode) {
802 info->aConstraintUsage[i].omit = true;
803 }
804 }
805
806 std::string t1 = state->BestIndexStrForDefinition(info, state->t1_defn);
807 std::string t2 = state->BestIndexStrForDefinition(info, state->t2_defn);
808 info->idxStr = sqlite3_mprintf("%s,%s", t1.c_str(), t2.c_str());
809 info->needToFreeIdxStr = true;
810
811 return SQLITE_OK;
812 }
813
Open(sqlite3_vtab * tab,sqlite3_vtab_cursor ** cursor)814 int SpanJoinOperatorModule::Open(sqlite3_vtab* tab,
815 sqlite3_vtab_cursor** cursor) {
816 State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
817 GetVtab(tab)->state);
818 std::unique_ptr<Cursor> c = std::make_unique<Cursor>(state);
819 *cursor = c.release();
820 return SQLITE_OK;
821 }
822
Close(sqlite3_vtab_cursor * cursor)823 int SpanJoinOperatorModule::Close(sqlite3_vtab_cursor* cursor) {
824 std::unique_ptr<Cursor> c(GetCursor(cursor));
825 return SQLITE_OK;
826 }
827
Filter(sqlite3_vtab_cursor * cursor,int,const char * idxStr,int,sqlite3_value ** argv)828 int SpanJoinOperatorModule::Filter(sqlite3_vtab_cursor* cursor,
829 int,
830 const char* idxStr,
831 int,
832 sqlite3_value** argv) {
833 PERFETTO_TP_TRACE(metatrace::Category::QUERY_DETAILED, "SPAN_JOIN_XFILTER");
834
835 Cursor* c = GetCursor(cursor);
836 Vtab* table = GetVtab(cursor->pVtab);
837 State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
838 table->state);
839
840 base::StringSplitter splitter(std::string(idxStr), ',');
841 bool t1_partitioned_mixed =
842 c->t1.definition()->IsPartitioned() &&
843 state->partitioning == PartitioningType::kMixedPartitioning;
844 auto t1_eof = state->IsOuterJoin() && !t1_partitioned_mixed
845 ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
846 : Query::InitialEofBehavior::kTreatAsEof;
847 base::Status status =
848 c->t1.Initialize(state->t1_defn.CreateSqlQuery(splitter, argv), t1_eof);
849 if (!status.ok()) {
850 return sqlite::utils::SetError(table, status.c_message());
851 }
852
853 bool t2_partitioned_mixed =
854 c->t2.definition()->IsPartitioned() &&
855 state->partitioning == PartitioningType::kMixedPartitioning;
856 auto t2_eof =
857 (state->IsLeftJoin() || state->IsOuterJoin()) && !t2_partitioned_mixed
858 ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
859 : Query::InitialEofBehavior::kTreatAsEof;
860 status =
861 c->t2.Initialize(state->t2_defn.CreateSqlQuery(splitter, argv), t2_eof);
862 if (!status.ok()) {
863 return sqlite::utils::SetError(table, status.c_message());
864 }
865
866 status = c->FindOverlappingSpan();
867 if (!status.ok()) {
868 return sqlite::utils::SetError(table, status.c_message());
869 }
870 return SQLITE_OK;
871 }
872
Next(sqlite3_vtab_cursor * cursor)873 int SpanJoinOperatorModule::Next(sqlite3_vtab_cursor* cursor) {
874 Cursor* c = GetCursor(cursor);
875 Vtab* table = GetVtab(cursor->pVtab);
876 base::Status status = c->next_query->Next();
877 if (!status.ok()) {
878 return sqlite::utils::SetError(table, status.c_message());
879 }
880 status = c->FindOverlappingSpan();
881 if (!status.ok()) {
882 return sqlite::utils::SetError(table, status.c_message());
883 }
884 return SQLITE_OK;
885 }
886
Eof(sqlite3_vtab_cursor * cur)887 int SpanJoinOperatorModule::Eof(sqlite3_vtab_cursor* cur) {
888 Cursor* c = GetCursor(cur);
889 return c->t1.IsEof() || c->t2.IsEof();
890 }
891
Column(sqlite3_vtab_cursor * cursor,sqlite3_context * context,int N)892 int SpanJoinOperatorModule::Column(sqlite3_vtab_cursor* cursor,
893 sqlite3_context* context,
894 int N) {
895 Cursor* c = GetCursor(cursor);
896 Vtab* table = GetVtab(cursor->pVtab);
897 State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
898 table->state);
899
900 PERFETTO_DCHECK(c->t1.IsReal() || c->t2.IsReal());
901
902 switch (N) {
903 case Column::kTimestamp: {
904 auto max_ts = std::max(c->t1.ts(), c->t2.ts());
905 sqlite::result::Long(context, static_cast<sqlite3_int64>(max_ts));
906 break;
907 }
908 case Column::kDuration: {
909 auto max_start = std::max(c->t1.ts(), c->t2.ts());
910 auto min_end = std::min(c->t1.raw_ts_end(), c->t2.raw_ts_end());
911 auto dur = min_end - max_start;
912 sqlite::result::Long(context, static_cast<sqlite3_int64>(dur));
913 break;
914 }
915 case Column::kPartition: {
916 if (state->partitioning != PartitioningType::kNoPartitioning) {
917 int64_t partition;
918 if (state->partitioning == PartitioningType::kMixedPartitioning) {
919 partition = c->last_mixed_partition_;
920 } else {
921 partition = c->t1.IsReal() ? c->t1.partition() : c->t2.partition();
922 }
923 sqlite::result::Long(context, static_cast<sqlite3_int64>(partition));
924 break;
925 }
926 PERFETTO_FALLTHROUGH;
927 }
928 default: {
929 const auto* locator =
930 state->global_index_to_column_locator.Find(static_cast<size_t>(N));
931 PERFETTO_CHECK(locator);
932 if (locator->defn == c->t1.definition()) {
933 c->t1.ReportSqliteResult(context, locator->col_index);
934 } else {
935 c->t2.ReportSqliteResult(context, locator->col_index);
936 }
937 }
938 }
939 return SQLITE_OK;
940 }
941
Rowid(sqlite3_vtab_cursor *,sqlite_int64 *)942 int SpanJoinOperatorModule::Rowid(sqlite3_vtab_cursor*, sqlite_int64*) {
943 return SQLITE_ERROR;
944 }
945
FindFunction(sqlite3_vtab *,int,const char * name,FindFunctionFn ** fn,void **)946 int SpanJoinOperatorModule::FindFunction(sqlite3_vtab*,
947 int,
948 const char* name,
949 FindFunctionFn** fn,
950 void**) {
951 if (base::CaseInsensitiveEqual(name, "source_geq")) {
952 *fn = [](sqlite3_context* ctx, int, sqlite3_value**) {
953 return sqlite::result::Error(ctx, "Should not be called.");
954 };
955 return kSourceGeqOpCode;
956 }
957 return 0;
958 }
959
IsOverlappingSpan() const960 bool SpanJoinOperatorModule::Cursor::IsOverlappingSpan() const {
961 // If either of the tables are eof, then we cannot possibly have an
962 // overlapping span.
963 if (t1.IsEof() || t2.IsEof())
964 return false;
965
966 // One of the tables always needs to have a real span to have a valid
967 // overlapping span.
968 if (!t1.IsReal() && !t2.IsReal())
969 return false;
970
971 using PartitioningType = PartitioningType;
972 if (state->partitioning == PartitioningType::kSamePartitioning) {
973 // If both tables are partitioned, then ensure that the partitions overlap.
974 bool partition_in_bounds = (t1.FirstPartition() >= t2.FirstPartition() &&
975 t1.FirstPartition() <= t2.LastPartition()) ||
976 (t2.FirstPartition() >= t1.FirstPartition() &&
977 t2.FirstPartition() <= t1.LastPartition());
978 if (!partition_in_bounds)
979 return false;
980 }
981
982 // We consider all slices to be [start, end) - that is the range of
983 // timestamps has an open interval at the start but a closed interval
984 // at the end. (with the exception of dur == -1 which we treat as if
985 // end == start for the purpose of this function).
986 return (t1.ts() == t2.ts() && t1.IsReal() && t2.IsReal()) ||
987 (t1.ts() >= t2.ts() && t1.ts() < t2.AdjustedTsEnd()) ||
988 (t2.ts() >= t1.ts() && t2.ts() < t1.AdjustedTsEnd());
989 }
990
FindOverlappingSpan()991 base::Status SpanJoinOperatorModule::Cursor::FindOverlappingSpan() {
992 // We loop until we find a slice which overlaps from the two tables.
993 while (true) {
994 if (state->partitioning == PartitioningType::kMixedPartitioning) {
995 // If we have a mixed partition setup, we need to have special checks
996 // for eof and to reset the unpartitioned cursor every time the partition
997 // changes in the partitioned table.
998 auto* partitioned = t1.definition()->IsPartitioned() ? &t1 : &t2;
999 auto* unpartitioned = t1.definition()->IsPartitioned() ? &t2 : &t1;
1000
1001 // If the partitioned table reaches eof, then we are really done.
1002 if (partitioned->IsEof())
1003 break;
1004
1005 // If the partition has changed from the previous one, reset the cursor
1006 // and keep a lot of the new partition.
1007 if (last_mixed_partition_ != partitioned->partition()) {
1008 base::Status status = unpartitioned->Rewind();
1009 if (!status.ok())
1010 return status;
1011 last_mixed_partition_ = partitioned->partition();
1012 }
1013 } else if (t1.IsEof() || t2.IsEof()) {
1014 // For both no partition and same partition cases, either cursor ending
1015 // ends the whole span join.
1016 break;
1017 }
1018
1019 // Find which slice finishes first.
1020 next_query = FindEarliestFinishQuery();
1021
1022 // If the current span is overlapping, just finish there to emit the current
1023 // slice.
1024 if (IsOverlappingSpan())
1025 break;
1026
1027 // Otherwise, step to the next row.
1028 base::Status status = next_query->Next();
1029 if (!status.ok())
1030 return status;
1031 }
1032 return base::OkStatus();
1033 }
1034
1035 SpanJoinOperatorModule::Query*
FindEarliestFinishQuery()1036 SpanJoinOperatorModule::Cursor::FindEarliestFinishQuery() {
1037 int64_t t1_part;
1038 int64_t t2_part;
1039
1040 switch (state->partitioning) {
1041 case PartitioningType::kMixedPartitioning: {
1042 // If either table is EOF, forward the other table to try and make
1043 // the partitions not match anymore.
1044 if (t1.IsEof())
1045 return &t2;
1046 if (t2.IsEof())
1047 return &t1;
1048
1049 // Otherwise, just make the partition equal from both tables.
1050 t1_part = last_mixed_partition_;
1051 t2_part = last_mixed_partition_;
1052 break;
1053 }
1054 case PartitioningType::kSamePartitioning: {
1055 // Get the partition values from the cursor.
1056 t1_part = t1.LastPartition();
1057 t2_part = t2.LastPartition();
1058 break;
1059 }
1060 case PartitioningType::kNoPartitioning: {
1061 t1_part = 0;
1062 t2_part = 0;
1063 break;
1064 }
1065 }
1066
1067 // Prefer to forward the earliest cursors based on the following
1068 // lexiographical ordering:
1069 // 1. partition
1070 // 2. end timestamp
1071 // 3. whether the slice is real or shadow (shadow < real)
1072 bool t1_less = std::make_tuple(t1_part, t1.AdjustedTsEnd(), t1.IsReal()) <
1073 std::make_tuple(t2_part, t2.AdjustedTsEnd(), t2.IsReal());
1074 return t1_less ? &t1 : &t2;
1075 }
1076
1077 } // namespace perfetto::trace_processor
1078