1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/perfetto_sql/intrinsics/operators/span_join_operator.h"
18 
19 #include <sqlite3.h>
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstring>
23 #include <limits>
24 #include <memory>
25 #include <optional>
26 #include <string>
27 #include <tuple>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31 
32 #include "perfetto/base/compiler.h"
33 #include "perfetto/base/logging.h"
34 #include "perfetto/base/status.h"
35 #include "perfetto/ext/base/string_splitter.h"
36 #include "perfetto/ext/base/string_utils.h"
37 #include "perfetto/trace_processor/basic_types.h"
38 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
39 #include "src/trace_processor/sqlite/bindings/sqlite_result.h"
40 #include "src/trace_processor/sqlite/module_lifecycle_manager.h"
41 #include "src/trace_processor/sqlite/sql_source.h"
42 #include "src/trace_processor/sqlite/sqlite_utils.h"
43 #include "src/trace_processor/tp_metatrace.h"
44 #include "src/trace_processor/util/status_macros.h"
45 
46 #include "protos/perfetto/trace_processor/metatrace_categories.pbzero.h"
47 
48 namespace perfetto::trace_processor {
49 
50 namespace {
51 
52 constexpr char kTsColumnName[] = "ts";
53 constexpr char kDurColumnName[] = "dur";
54 
IsRequiredColumn(const std::string & name)55 bool IsRequiredColumn(const std::string& name) {
56   return name == kTsColumnName || name == kDurColumnName;
57 }
58 
HasDuplicateColumns(const std::vector<std::pair<SqlValue::Type,std::string>> & t1,const std::vector<std::pair<SqlValue::Type,std::string>> & t2,const std::optional<std::string> & partition_col)59 std::optional<std::string> HasDuplicateColumns(
60     const std::vector<std::pair<SqlValue::Type, std::string>>& t1,
61     const std::vector<std::pair<SqlValue::Type, std::string>>& t2,
62     const std::optional<std::string>& partition_col) {
63   std::unordered_set<std::string> seen_names;
64   for (const auto& col : t1) {
65     if (IsRequiredColumn(col.second) || col.second == partition_col) {
66       continue;
67     }
68     if (seen_names.count(col.second) > 0) {
69       return col.second;
70     }
71     seen_names.insert(col.second);
72   }
73   for (const auto& col : t2) {
74     if (IsRequiredColumn(col.second) || col.second == partition_col) {
75       continue;
76     }
77     if (seen_names.count(col.second) > 0) {
78       return col.second;
79     }
80     seen_names.insert(col.second);
81   }
82   return std::nullopt;
83 }
84 
OpToString(int op)85 std::string OpToString(int op) {
86   switch (op) {
87     case SQLITE_INDEX_CONSTRAINT_EQ:
88       return "=";
89     case SQLITE_INDEX_CONSTRAINT_NE:
90       return "!=";
91     case SQLITE_INDEX_CONSTRAINT_GE:
92       return ">=";
93     case SQLITE_INDEX_CONSTRAINT_GT:
94       return ">";
95     case SQLITE_INDEX_CONSTRAINT_LE:
96       return "<=";
97     case SQLITE_INDEX_CONSTRAINT_LT:
98       return "<";
99     case SQLITE_INDEX_CONSTRAINT_LIKE:
100       return " like ";
101     case SQLITE_INDEX_CONSTRAINT_GLOB:
102       return " glob ";
103     case SQLITE_INDEX_CONSTRAINT_ISNULL:
104       // The "null" will be added below in EscapedSqliteValueAsString.
105       return " is ";
106     case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
107       // The "null" will be added below in EscapedSqliteValueAsString.
108       return " is not ";
109     default:
110       PERFETTO_FATAL("Operator to string conversion not impemented for %d", op);
111   }
112 }
113 
EscapedSqliteValueAsString(sqlite3_value * value)114 std::string EscapedSqliteValueAsString(sqlite3_value* value) {
115   switch (sqlite3_value_type(value)) {
116     case SQLITE_INTEGER:
117       return std::to_string(sqlite3_value_int64(value));
118     case SQLITE_FLOAT:
119       return std::to_string(sqlite3_value_double(value));
120     case SQLITE_TEXT: {
121       // If str itself contains a single quote, we need to escape it with
122       // another single quote.
123       const char* str =
124           reinterpret_cast<const char*>(sqlite3_value_text(value));
125       return "'" + base::ReplaceAll(str, "'", "''") + "'";
126     }
127     case SQLITE_NULL:
128       return " null";
129     default:
130       PERFETTO_FATAL("Unknown value type %d", sqlite3_value_type(value));
131   }
132 }
133 
134 }  // namespace
135 
PopulateColumnLocatorMap(uint32_t offset)136 void SpanJoinOperatorModule::State::PopulateColumnLocatorMap(uint32_t offset) {
137   for (uint32_t i = 0; i < t1_defn.columns().size(); ++i) {
138     if (i == t1_defn.ts_idx() || i == t1_defn.dur_idx() ||
139         i == t1_defn.partition_idx()) {
140       continue;
141     }
142     ColumnLocator* locator = &global_index_to_column_locator[offset++];
143     locator->defn = &t1_defn;
144     locator->col_index = i;
145   }
146   for (uint32_t i = 0; i < t2_defn.columns().size(); ++i) {
147     if (i == t2_defn.ts_idx() || i == t2_defn.dur_idx() ||
148         i == t2_defn.partition_idx()) {
149       continue;
150     }
151     ColumnLocator* locator = &global_index_to_column_locator[offset++];
152     locator->defn = &t2_defn;
153     locator->col_index = i;
154   }
155 }
156 
BestIndexStrForDefinition(const sqlite3_index_info * info,const TableDefinition & defn)157 std::string SpanJoinOperatorModule::State::BestIndexStrForDefinition(
158     const sqlite3_index_info* info,
159     const TableDefinition& defn) {
160   uint32_t count = 0;
161   std::string constraints;
162   for (int i = 0; i < info->nConstraint; i++) {
163     const auto& c = info->aConstraint[i];
164     if (!c.usable) {
165       continue;
166     }
167 
168     auto col_name = GetNameForGlobalColumnIndex(defn, c.iColumn);
169     if (col_name.empty()) {
170       continue;
171     }
172 
173     // Le constraints can be passed straight to the child tables as they won't
174     // affect the span join computation. Similarily, source_geq constraints
175     // explicitly request that they are passed as geq constraints to the source
176     // tables.
177     if (col_name == kTsColumnName && !sqlite::utils::IsOpLe(c.op) &&
178         c.op != kSourceGeqOpCode) {
179       continue;
180     }
181 
182     // Allow SQLite handle any constraints on duration apart from source_geq
183     // constraints.
184     if (col_name == kDurColumnName && c.op != kSourceGeqOpCode) {
185       continue;
186     }
187 
188     // If we're emitting shadow slices, don't propogate any constraints
189     // on this table as this will break the shadow slice computation.
190     if (defn.ShouldEmitPresentPartitionShadow()) {
191       continue;
192     }
193 
194     PERFETTO_DCHECK(info->aConstraintUsage[i].argvIndex > 0);
195     std::string argvIndex =
196         std::to_string(info->aConstraintUsage[i].argvIndex - 1);
197     std::string op = OpToString(
198         c.op == kSourceGeqOpCode ? SQLITE_INDEX_CONSTRAINT_GE : c.op);
199     constraints += "," + argvIndex + "," + "`" + col_name + "`" + op;
200     count++;
201   }
202   return std::to_string(count) + constraints;
203 }
204 
Create(PerfettoSqlEngine * engine,const TableDescriptor & desc,EmitShadowType emit_shadow_type,TableDefinition * defn)205 base::Status SpanJoinOperatorModule::TableDefinition::Create(
206     PerfettoSqlEngine* engine,
207     const TableDescriptor& desc,
208     EmitShadowType emit_shadow_type,
209     TableDefinition* defn) {
210   if (desc.partition_col == kTsColumnName ||
211       desc.partition_col == kDurColumnName) {
212     return base::ErrStatus(
213         "SPAN_JOIN: partition column cannot be any of {ts, dur} for table %s",
214         desc.name.c_str());
215   }
216 
217   std::vector<std::pair<SqlValue::Type, std::string>> cols;
218   RETURN_IF_ERROR(sqlite::utils::GetColumnsForTable(
219       engine->sqlite_engine()->db(), desc.name, cols));
220 
221   uint32_t required_columns_found = 0;
222   uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
223   uint32_t dur_idx = std::numeric_limits<uint32_t>::max();
224   uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
225   for (uint32_t i = 0; i < cols.size(); i++) {
226     auto col = cols[i];
227     if (IsRequiredColumn(col.second)) {
228       ++required_columns_found;
229     }
230     if (base::Contains(col.second, ",")) {
231       return base::ErrStatus("SPAN_JOIN: column '%s' cannot contain any ','",
232                              col.second.c_str());
233     }
234     if (base::Contains(col.second, ':')) {
235       return base::ErrStatus("SPAN_JOIN: column '%s' cannot contain any ':'",
236                              col.second.c_str());
237     }
238 
239     if (col.second == kTsColumnName) {
240       ts_idx = i;
241     } else if (col.second == kDurColumnName) {
242       dur_idx = i;
243     } else if (col.second == desc.partition_col) {
244       partition_idx = i;
245     }
246   }
247   if (required_columns_found != 2) {
248     return base::ErrStatus(
249         "SPAN_JOIN: Missing one of columns {ts, dur} in table %s",
250         desc.name.c_str());
251   }
252   if (desc.IsPartitioned() && partition_idx >= cols.size()) {
253     return base::ErrStatus(
254         "SPAN_JOIN: Missing partition column '%s' in table '%s'",
255         desc.partition_col.c_str(), desc.name.c_str());
256   }
257 
258   PERFETTO_DCHECK(ts_idx < cols.size());
259   PERFETTO_DCHECK(dur_idx < cols.size());
260 
261   *defn = TableDefinition(desc.name, desc.partition_col, std::move(cols),
262                           emit_shadow_type, ts_idx, dur_idx, partition_idx);
263   return base::OkStatus();
264 }
265 
266 std::string
CreateVtabCreateTableSection() const267 SpanJoinOperatorModule::TableDefinition::CreateVtabCreateTableSection() const {
268   std::string cols;
269   for (const auto& col : columns()) {
270     if (IsRequiredColumn(col.second) || col.second == partition_col()) {
271       continue;
272     }
273     if (col.first == SqlValue::Type::kNull) {
274       cols += col.second + ",";
275     } else {
276       cols += col.second + " " +
277               sqlite::utils::SqlValueTypeToSqliteTypeName(col.first) + ",";
278     }
279   }
280   return cols;
281 }
282 
GetNameForGlobalColumnIndex(const TableDefinition & defn,int global_column)283 std::string SpanJoinOperatorModule::State::GetNameForGlobalColumnIndex(
284     const TableDefinition& defn,
285     int global_column) {
286   auto col_idx = static_cast<size_t>(global_column);
287   if (col_idx == Column::kTimestamp) {
288     return kTsColumnName;
289   }
290   if (col_idx == Column::kDuration) {
291     return kDurColumnName;
292   }
293   if (col_idx == Column::kPartition &&
294       partitioning != PartitioningType::kNoPartitioning) {
295     return defn.partition_col();
296   }
297 
298   const auto& locator = global_index_to_column_locator[col_idx];
299   if (locator.defn != &defn) {
300     return "";
301   }
302   return defn.columns()[locator.col_index].second;
303 }
304 
Query(SpanJoinOperatorModule::State * state,const TableDefinition * definition)305 SpanJoinOperatorModule::Query::Query(SpanJoinOperatorModule::State* state,
306                                      const TableDefinition* definition)
307     : defn_(definition), in_state_(state) {
308   PERFETTO_DCHECK(!defn_->IsPartitioned() ||
309                   defn_->partition_idx() < defn_->columns().size());
310 }
311 
312 SpanJoinOperatorModule::Query::~Query() = default;
313 
Initialize(std::string sql_query,InitialEofBehavior eof_behavior)314 base::Status SpanJoinOperatorModule::Query::Initialize(
315     std::string sql_query,
316     InitialEofBehavior eof_behavior) {
317   *this = Query(in_state_, definition());
318   sql_query_ = std::move(sql_query);
319   base::Status status = Rewind();
320   if (!status.ok())
321     return status;
322   if (eof_behavior == InitialEofBehavior::kTreatAsMissingPartitionShadow &&
323       IsEof()) {
324     state_ = State::kMissingPartitionShadow;
325   }
326   return status;
327 }
328 
Next()329 base::Status SpanJoinOperatorModule::Query::Next() {
330   RETURN_IF_ERROR(NextSliceState());
331   return FindNextValidSlice();
332 }
333 
IsValidSlice()334 bool SpanJoinOperatorModule::Query::IsValidSlice() {
335   // Disallow any single partition shadow slices if the definition doesn't allow
336   // them.
337   if (IsPresentPartitionShadow() && !defn_->ShouldEmitPresentPartitionShadow())
338     return false;
339 
340   // Disallow any missing partition shadow slices if the definition doesn't
341   // allow them.
342   if (IsMissingPartitionShadow() && !defn_->ShouldEmitMissingPartitionShadow())
343     return false;
344 
345   // Disallow any "empty" shadows; these are shadows which either have the same
346   // start and end time or missing-partition shadows which have the same start
347   // and end partition.
348   if (IsEmptyShadow())
349     return false;
350 
351   return true;
352 }
353 
FindNextValidSlice()354 base::Status SpanJoinOperatorModule::Query::FindNextValidSlice() {
355   // The basic idea of this function is that |NextSliceState()| always emits
356   // all possible slices (including shadows for any gaps inbetween the real
357   // slices) and we filter out the invalid slices (as defined by the table
358   // definition) using |IsValidSlice()|.
359   //
360   // This has proved to be a lot cleaner to implement than trying to choose
361   // when to emit and not emit shadows directly.
362   while (!IsEof() && !IsValidSlice()) {
363     RETURN_IF_ERROR(NextSliceState());
364   }
365   return base::OkStatus();
366 }
367 
NextSliceState()368 base::Status SpanJoinOperatorModule::Query::NextSliceState() {
369   switch (state_) {
370     case State::kReal: {
371       // Forward the cursor to figure out where the next slice should be.
372       RETURN_IF_ERROR(CursorNext());
373 
374       // Depending on the next slice, we can do two things here:
375       // 1. If the next slice is on the same partition, we can just emit a
376       //    single shadow until the start of the next slice.
377       // 2. If the next slice is on another partition or we hit eof, just emit
378       //    a shadow to the end of the whole partition.
379       bool shadow_to_end = cursor_eof_ || (defn_->IsPartitioned() &&
380                                            partition_ != CursorPartition());
381       state_ = State::kPresentPartitionShadow;
382       ts_ = AdjustedTsEnd();
383       ts_end_ =
384           shadow_to_end ? std::numeric_limits<int64_t>::max() : CursorTs();
385       return base::OkStatus();
386     }
387     case State::kPresentPartitionShadow: {
388       if (ts_end_ == std::numeric_limits<int64_t>::max()) {
389         // If the shadow is to the end of the slice, create a missing partition
390         // shadow to the start of the partition of the next slice or to the max
391         // partition if we hit eof.
392         state_ = State::kMissingPartitionShadow;
393         ts_ = 0;
394         ts_end_ = std::numeric_limits<int64_t>::max();
395 
396         missing_partition_start_ = partition_ + 1;
397         missing_partition_end_ = cursor_eof_
398                                      ? std::numeric_limits<int64_t>::max()
399                                      : CursorPartition();
400       } else {
401         // If the shadow is not to the end, we must have another slice on the
402         // current partition.
403         state_ = State::kReal;
404         ts_ = CursorTs();
405         ts_end_ = ts_ + CursorDur();
406 
407         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
408                         partition_ == CursorPartition());
409       }
410       return base::OkStatus();
411     }
412     case State::kMissingPartitionShadow: {
413       if (missing_partition_end_ == std::numeric_limits<int64_t>::max()) {
414         PERFETTO_DCHECK(cursor_eof_);
415 
416         // If we have a missing partition to the max partition, we must have hit
417         // eof.
418         state_ = State::kEof;
419       } else {
420         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
421                         CursorPartition() == missing_partition_end_);
422 
423         // Otherwise, setup a single partition slice on the end partition to the
424         // start of the next slice.
425         state_ = State::kPresentPartitionShadow;
426         ts_ = 0;
427         ts_end_ = CursorTs();
428         partition_ = missing_partition_end_;
429       }
430       return base::OkStatus();
431     }
432     case State::kEof: {
433       PERFETTO_DFATAL("Called Next when EOF");
434       return base::ErrStatus("Called Next when EOF");
435     }
436   }
437   PERFETTO_FATAL("For GCC");
438 }
439 
Rewind()440 base::Status SpanJoinOperatorModule::Query::Rewind() {
441   auto res = in_state_->engine->sqlite_engine()->PrepareStatement(
442       SqlSource::FromTraceProcessorImplementation(sql_query_));
443   cursor_eof_ = false;
444   RETURN_IF_ERROR(res.status());
445   stmt_ = std::move(res);
446 
447   RETURN_IF_ERROR(CursorNext());
448 
449   // Setup the first slice as a missing partition shadow from the lowest
450   // partition until the first slice partition. We will handle finding the real
451   // slice in |FindNextValidSlice()|.
452   state_ = State::kMissingPartitionShadow;
453   ts_ = 0;
454   ts_end_ = std::numeric_limits<int64_t>::max();
455   missing_partition_start_ = std::numeric_limits<int64_t>::min();
456 
457   if (cursor_eof_) {
458     missing_partition_end_ = std::numeric_limits<int64_t>::max();
459   } else if (defn_->IsPartitioned()) {
460     missing_partition_end_ = CursorPartition();
461   } else {
462     missing_partition_end_ = std::numeric_limits<int64_t>::min();
463   }
464 
465   // Actually compute the first valid slice.
466   return FindNextValidSlice();
467 }
468 
CursorNext()469 base::Status SpanJoinOperatorModule::Query::CursorNext() {
470   if (defn_->IsPartitioned()) {
471     auto partition_idx = static_cast<int>(defn_->partition_idx());
472     // Fastforward through any rows with null partition keys.
473     int row_type;
474     do {
475       cursor_eof_ = !stmt_->Step();
476       RETURN_IF_ERROR(stmt_->status());
477       row_type = sqlite3_column_type(stmt_->sqlite_stmt(), partition_idx);
478     } while (!cursor_eof_ && row_type == SQLITE_NULL);
479 
480     if (!cursor_eof_ && row_type != SQLITE_INTEGER) {
481       return base::ErrStatus("SPAN_JOIN: partition is not an INT column");
482     }
483   } else {
484     cursor_eof_ = !stmt_->Step();
485   }
486   return base::OkStatus();
487 }
488 
ReportSqliteResult(sqlite3_context * context,size_t index)489 void SpanJoinOperatorModule::Query::ReportSqliteResult(sqlite3_context* context,
490                                                        size_t index) {
491   if (state_ != State::kReal) {
492     return sqlite::result::Null(context);
493   }
494 
495   sqlite3_stmt* stmt = stmt_->sqlite_stmt();
496   int idx = static_cast<int>(index);
497   switch (sqlite3_column_type(stmt, idx)) {
498     case SQLITE_INTEGER:
499       return sqlite::result::Long(context, sqlite3_column_int64(stmt, idx));
500     case SQLITE_FLOAT:
501       return sqlite::result::Double(context, sqlite3_column_double(stmt, idx));
502     case SQLITE_TEXT: {
503       // TODO(lalitm): note for future optimizations: if we knew the addresses
504       // of the string intern pool, we could check if the string returned here
505       // comes from the pool, and pass it as non-transient.
506       const auto* ptr =
507           reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
508       return sqlite::result::TransientString(context, ptr);
509     }
510     case SQLITE_BLOB: {
511       return sqlite::result::TransientBytes(context,
512                                             sqlite3_column_blob(stmt, idx),
513                                             sqlite3_column_bytes(stmt, idx));
514     }
515   }
516 }
517 
TableDefinition(std::string name,std::string partition_col,std::vector<std::pair<SqlValue::Type,std::string>> cols,EmitShadowType emit_shadow_type,uint32_t ts_idx,uint32_t dur_idx,uint32_t partition_idx)518 SpanJoinOperatorModule::TableDefinition::TableDefinition(
519     std::string name,
520     std::string partition_col,
521     std::vector<std::pair<SqlValue::Type, std::string>> cols,
522     EmitShadowType emit_shadow_type,
523     uint32_t ts_idx,
524     uint32_t dur_idx,
525     uint32_t partition_idx)
526     : emit_shadow_type_(emit_shadow_type),
527       name_(std::move(name)),
528       partition_col_(std::move(partition_col)),
529       cols_(std::move(cols)),
530       ts_idx_(ts_idx),
531       dur_idx_(dur_idx),
532       partition_idx_(partition_idx) {}
533 
Parse(const std::string & raw_descriptor,TableDescriptor * descriptor)534 base::Status SpanJoinOperatorModule::TableDescriptor::Parse(
535     const std::string& raw_descriptor,
536     TableDescriptor* descriptor) {
537   // Descriptors have one of the following forms:
538   // table_name [PARTITIONED column_name]
539 
540   // Find the table name.
541   base::StringSplitter splitter(raw_descriptor, ' ');
542   if (!splitter.Next())
543     return base::ErrStatus("SPAN_JOIN: Missing table name");
544 
545   descriptor->name = splitter.cur_token();
546   if (!splitter.Next())
547     return base::OkStatus();
548 
549   if (!base::CaseInsensitiveEqual(splitter.cur_token(), "PARTITIONED"))
550     return base::ErrStatus("SPAN_JOIN: Invalid token");
551 
552   if (!splitter.Next())
553     return base::ErrStatus("SPAN_JOIN: Missing partitioning column");
554 
555   descriptor->partition_col = splitter.cur_token();
556   return base::OkStatus();
557 }
558 
CreateSqlQuery(base::StringSplitter & idx,sqlite3_value ** argv) const559 std::string SpanJoinOperatorModule::TableDefinition::CreateSqlQuery(
560     base::StringSplitter& idx,
561     sqlite3_value** argv) const {
562   std::vector<std::string> col_names;
563   for (const auto& c : columns()) {
564     col_names.push_back("`" + c.second + "`");
565   }
566 
567   PERFETTO_CHECK(idx.Next());
568   std::optional<uint32_t> cs_count = base::StringToUInt32(idx.cur_token());
569   PERFETTO_CHECK(cs_count);
570   std::vector<std::string> cs;
571   cs.reserve(*cs_count);
572   for (uint32_t i = 0; i < *cs_count; ++i) {
573     PERFETTO_CHECK(idx.Next());
574     std::optional<uint32_t> argv_idx = base::StringToUInt32(idx.cur_token());
575     PERFETTO_CHECK(argv_idx);
576 
577     PERFETTO_CHECK(idx.Next());
578     cs.emplace_back(idx.cur_token() +
579                     EscapedSqliteValueAsString(argv[*argv_idx]));
580   }
581 
582   std::string sql = "SELECT " + base::Join(col_names, ", ");
583   sql += " FROM " + name();
584   if (!cs.empty()) {
585     sql += " WHERE " + base::Join(cs, " AND ");
586   }
587   sql += " ORDER BY ";
588   sql += IsPartitioned() ? base::Join({"`" + partition_col() + "`", "ts"}, ", ")
589                          : "ts";
590   sql += ";";
591   return sql;
592 }
593 
Create(sqlite3 * db,void * ctx,int argc,const char * const * argv,sqlite3_vtab ** vtab,char ** pzErr)594 int SpanJoinOperatorModule::Create(sqlite3* db,
595                                    void* ctx,
596                                    int argc,
597                                    const char* const* argv,
598                                    sqlite3_vtab** vtab,
599                                    char** pzErr) {
600   // argv[0] - argv[2] are SQLite populated fields which are always present.
601   if (argc < 5) {
602     *pzErr = sqlite3_mprintf("SPAN_JOIN: expected at least 2 args");
603     return SQLITE_ERROR;
604   }
605 
606   auto* context = GetContext(ctx);
607   auto state = std::make_unique<State>();
608   state->engine = context->engine;
609   state->module_name = argv[0];
610 
611   TableDescriptor t1_desc;
612   auto status = TableDescriptor::Parse(
613       std::string(reinterpret_cast<const char*>(argv[3])), &t1_desc);
614   if (!status.ok()) {
615     *pzErr = sqlite3_mprintf("%s", status.c_message());
616     return SQLITE_ERROR;
617   }
618 
619   TableDescriptor t2_desc;
620   status = TableDescriptor::Parse(
621       std::string(reinterpret_cast<const char*>(argv[4])), &t2_desc);
622   if (!status.ok()) {
623     *pzErr = sqlite3_mprintf("%s", status.c_message());
624     return SQLITE_ERROR;
625   }
626 
627   // Check that the partition columns match between the two tables.
628   if (t1_desc.partition_col == t2_desc.partition_col) {
629     state->partitioning = t1_desc.IsPartitioned()
630                               ? PartitioningType::kSamePartitioning
631                               : PartitioningType::kNoPartitioning;
632   } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
633     *pzErr = sqlite3_mprintf(
634         "SPAN_JOIN: mismatching partitions between the two tables; "
635         "(partition %s in table %s, partition %s in table %s)",
636         t1_desc.partition_col.c_str(), t1_desc.name.c_str(),
637         t2_desc.partition_col.c_str(), t2_desc.name.c_str());
638     return SQLITE_ERROR;
639   } else {
640     state->partitioning = PartitioningType::kMixedPartitioning;
641   }
642 
643   bool t1_part_mixed =
644       t1_desc.IsPartitioned() &&
645       state->partitioning == PartitioningType::kMixedPartitioning;
646   bool t2_part_mixed =
647       t2_desc.IsPartitioned() &&
648       state->partitioning == PartitioningType::kMixedPartitioning;
649 
650   EmitShadowType t1_shadow_type;
651   if (state->IsOuterJoin()) {
652     if (t1_part_mixed ||
653         state->partitioning == PartitioningType::kNoPartitioning) {
654       t1_shadow_type = EmitShadowType::kPresentPartitionOnly;
655     } else {
656       t1_shadow_type = EmitShadowType::kAll;
657     }
658   } else {
659     t1_shadow_type = EmitShadowType::kNone;
660   }
661   status = TableDefinition::Create(state->engine, t1_desc, t1_shadow_type,
662                                    &state->t1_defn);
663   if (!status.ok()) {
664     *pzErr = sqlite3_mprintf("%s", status.c_message());
665     return SQLITE_ERROR;
666   }
667 
668   EmitShadowType t2_shadow_type;
669   if (state->IsOuterJoin() || state->IsLeftJoin()) {
670     if (t2_part_mixed ||
671         state->partitioning == PartitioningType::kNoPartitioning) {
672       t2_shadow_type = EmitShadowType::kPresentPartitionOnly;
673     } else {
674       t2_shadow_type = EmitShadowType::kAll;
675     }
676   } else {
677     t2_shadow_type = EmitShadowType::kNone;
678   }
679   status = TableDefinition::Create(state->engine, t2_desc, t2_shadow_type,
680                                    &state->t2_defn);
681   if (!status.ok()) {
682     *pzErr = sqlite3_mprintf("%s", status.c_message());
683     return SQLITE_ERROR;
684   }
685 
686   if (auto dupe = HasDuplicateColumns(
687           state->t1_defn.columns(), state->t2_defn.columns(),
688           state->partitioning == PartitioningType::kNoPartitioning
689               ? std::nullopt
690               : std::make_optional(state->partition_col()))) {
691     *pzErr = sqlite3_mprintf(
692         "SPAN_JOIN: column %s present in both tables %s and %s", dupe->c_str(),
693         state->t1_defn.name().c_str(), state->t2_defn.name().c_str());
694     return SQLITE_ERROR;
695   }
696 
697   // Create the map from column index to the column in the child sub-queries.
698   state->PopulateColumnLocatorMap(
699       state->partitioning == PartitioningType::kNoPartitioning ? 2 : 3);
700 
701   std::string primary_key = "ts";
702   std::string partition;
703   if (state->partitioning != PartitioningType::kNoPartitioning) {
704     partition = state->partition_col() + " BIGINT,";
705     primary_key += ", " + state->partition_col();
706   }
707   std::string t1_section = state->t1_defn.CreateVtabCreateTableSection();
708   std::string t2_section = state->t2_defn.CreateVtabCreateTableSection();
709   static constexpr char kStmt[] = R"(
710     CREATE TABLE x(
711       ts BIGINT,
712       dur BIGINT,
713       %s
714       %s
715       %s
716       PRIMARY KEY(%s)
717     )
718   )";
719   base::StackString<1024> create_table_str(
720       kStmt, partition.c_str(), t1_section.c_str(), t2_section.c_str(),
721       primary_key.c_str());
722   state->create_table_stmt = create_table_str.ToStdString();
723   if (int ret = sqlite3_declare_vtab(db, create_table_str.c_str());
724       ret != SQLITE_OK) {
725     return ret;
726   }
727 
728   std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
729   res->state = context->manager.OnCreate(argv, std::move(state));
730   *vtab = res.release();
731   return SQLITE_OK;
732 }
733 
Destroy(sqlite3_vtab * vtab)734 int SpanJoinOperatorModule::Destroy(sqlite3_vtab* vtab) {
735   std::unique_ptr<Vtab> tab(GetVtab(vtab));
736   sqlite::ModuleStateManager<SpanJoinOperatorModule>::OnDestroy(tab->state);
737   return SQLITE_OK;
738 }
739 
Connect(sqlite3 * db,void * ctx,int,const char * const * argv,sqlite3_vtab ** vtab,char **)740 int SpanJoinOperatorModule::Connect(sqlite3* db,
741                                     void* ctx,
742                                     int,
743                                     const char* const* argv,
744                                     sqlite3_vtab** vtab,
745                                     char**) {
746   auto* context = GetContext(ctx);
747   std::unique_ptr<Vtab> res = std::make_unique<Vtab>();
748   res->state = context->manager.OnConnect(argv);
749 
750   auto* state =
751       sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(res->state);
752   if (int ret = sqlite3_declare_vtab(db, state->create_table_stmt.c_str());
753       ret != SQLITE_OK) {
754     return ret;
755   }
756   *vtab = res.release();
757   return SQLITE_OK;
758 }
759 
Disconnect(sqlite3_vtab * vtab)760 int SpanJoinOperatorModule::Disconnect(sqlite3_vtab* vtab) {
761   std::unique_ptr<Vtab> tab(GetVtab(vtab));
762   sqlite::ModuleStateManager<SpanJoinOperatorModule>::OnDisconnect(tab->state);
763   return SQLITE_OK;
764 }
765 
BestIndex(sqlite3_vtab * tab,sqlite3_index_info * info)766 int SpanJoinOperatorModule::BestIndex(sqlite3_vtab* tab,
767                                       sqlite3_index_info* info) {
768   int argvIndex = 1;
769   for (int i = 0; i < info->nConstraint; ++i) {
770     if (!info->aConstraint[i].usable) {
771       continue;
772     }
773     info->aConstraintUsage[i].argvIndex = argvIndex++;
774   }
775 
776   Vtab* table = GetVtab(tab);
777   State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
778       table->state);
779   if (state->partitioning == PartitioningType::kNoPartitioning) {
780     // If both tables are not partitioned and we have a single order by on ts,
781     // we return data in the correct order.
782     info->orderByConsumed = info->nOrderBy == 1 &&
783                             info->aOrderBy[0].iColumn == Column::kTimestamp &&
784                             !info->aOrderBy[0].desc;
785   } else {
786     // If one of the tables is partitioned, and we have an order by on the
787     // partition column followed (optionally) by an order by on timestamp, we
788     // return data in the correct order.
789     bool is_first_ob_partition =
790         info->nOrderBy > 0 && info->aOrderBy[0].iColumn == Column::kPartition &&
791         !info->aOrderBy[0].desc;
792     bool is_second_ob_ts = info->nOrderBy >= 2 &&
793                            info->aOrderBy[1].iColumn == Column::kTimestamp &&
794                            !info->aOrderBy[1].desc;
795     info->orderByConsumed =
796         (info->nOrderBy == 1 && is_first_ob_partition) ||
797         (info->nOrderBy == 2 && is_first_ob_partition && is_second_ob_ts);
798   }
799 
800   for (int i = 0; i < info->nConstraint; ++i) {
801     if (info->aConstraint[i].op == kSourceGeqOpCode) {
802       info->aConstraintUsage[i].omit = true;
803     }
804   }
805 
806   std::string t1 = state->BestIndexStrForDefinition(info, state->t1_defn);
807   std::string t2 = state->BestIndexStrForDefinition(info, state->t2_defn);
808   info->idxStr = sqlite3_mprintf("%s,%s", t1.c_str(), t2.c_str());
809   info->needToFreeIdxStr = true;
810 
811   return SQLITE_OK;
812 }
813 
Open(sqlite3_vtab * tab,sqlite3_vtab_cursor ** cursor)814 int SpanJoinOperatorModule::Open(sqlite3_vtab* tab,
815                                  sqlite3_vtab_cursor** cursor) {
816   State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
817       GetVtab(tab)->state);
818   std::unique_ptr<Cursor> c = std::make_unique<Cursor>(state);
819   *cursor = c.release();
820   return SQLITE_OK;
821 }
822 
Close(sqlite3_vtab_cursor * cursor)823 int SpanJoinOperatorModule::Close(sqlite3_vtab_cursor* cursor) {
824   std::unique_ptr<Cursor> c(GetCursor(cursor));
825   return SQLITE_OK;
826 }
827 
Filter(sqlite3_vtab_cursor * cursor,int,const char * idxStr,int,sqlite3_value ** argv)828 int SpanJoinOperatorModule::Filter(sqlite3_vtab_cursor* cursor,
829                                    int,
830                                    const char* idxStr,
831                                    int,
832                                    sqlite3_value** argv) {
833   PERFETTO_TP_TRACE(metatrace::Category::QUERY_DETAILED, "SPAN_JOIN_XFILTER");
834 
835   Cursor* c = GetCursor(cursor);
836   Vtab* table = GetVtab(cursor->pVtab);
837   State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
838       table->state);
839 
840   base::StringSplitter splitter(std::string(idxStr), ',');
841   bool t1_partitioned_mixed =
842       c->t1.definition()->IsPartitioned() &&
843       state->partitioning == PartitioningType::kMixedPartitioning;
844   auto t1_eof = state->IsOuterJoin() && !t1_partitioned_mixed
845                     ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
846                     : Query::InitialEofBehavior::kTreatAsEof;
847   base::Status status =
848       c->t1.Initialize(state->t1_defn.CreateSqlQuery(splitter, argv), t1_eof);
849   if (!status.ok()) {
850     return sqlite::utils::SetError(table, status.c_message());
851   }
852 
853   bool t2_partitioned_mixed =
854       c->t2.definition()->IsPartitioned() &&
855       state->partitioning == PartitioningType::kMixedPartitioning;
856   auto t2_eof =
857       (state->IsLeftJoin() || state->IsOuterJoin()) && !t2_partitioned_mixed
858           ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
859           : Query::InitialEofBehavior::kTreatAsEof;
860   status =
861       c->t2.Initialize(state->t2_defn.CreateSqlQuery(splitter, argv), t2_eof);
862   if (!status.ok()) {
863     return sqlite::utils::SetError(table, status.c_message());
864   }
865 
866   status = c->FindOverlappingSpan();
867   if (!status.ok()) {
868     return sqlite::utils::SetError(table, status.c_message());
869   }
870   return SQLITE_OK;
871 }
872 
Next(sqlite3_vtab_cursor * cursor)873 int SpanJoinOperatorModule::Next(sqlite3_vtab_cursor* cursor) {
874   Cursor* c = GetCursor(cursor);
875   Vtab* table = GetVtab(cursor->pVtab);
876   base::Status status = c->next_query->Next();
877   if (!status.ok()) {
878     return sqlite::utils::SetError(table, status.c_message());
879   }
880   status = c->FindOverlappingSpan();
881   if (!status.ok()) {
882     return sqlite::utils::SetError(table, status.c_message());
883   }
884   return SQLITE_OK;
885 }
886 
Eof(sqlite3_vtab_cursor * cur)887 int SpanJoinOperatorModule::Eof(sqlite3_vtab_cursor* cur) {
888   Cursor* c = GetCursor(cur);
889   return c->t1.IsEof() || c->t2.IsEof();
890 }
891 
Column(sqlite3_vtab_cursor * cursor,sqlite3_context * context,int N)892 int SpanJoinOperatorModule::Column(sqlite3_vtab_cursor* cursor,
893                                    sqlite3_context* context,
894                                    int N) {
895   Cursor* c = GetCursor(cursor);
896   Vtab* table = GetVtab(cursor->pVtab);
897   State* state = sqlite::ModuleStateManager<SpanJoinOperatorModule>::GetState(
898       table->state);
899 
900   PERFETTO_DCHECK(c->t1.IsReal() || c->t2.IsReal());
901 
902   switch (N) {
903     case Column::kTimestamp: {
904       auto max_ts = std::max(c->t1.ts(), c->t2.ts());
905       sqlite::result::Long(context, static_cast<sqlite3_int64>(max_ts));
906       break;
907     }
908     case Column::kDuration: {
909       auto max_start = std::max(c->t1.ts(), c->t2.ts());
910       auto min_end = std::min(c->t1.raw_ts_end(), c->t2.raw_ts_end());
911       auto dur = min_end - max_start;
912       sqlite::result::Long(context, static_cast<sqlite3_int64>(dur));
913       break;
914     }
915     case Column::kPartition: {
916       if (state->partitioning != PartitioningType::kNoPartitioning) {
917         int64_t partition;
918         if (state->partitioning == PartitioningType::kMixedPartitioning) {
919           partition = c->last_mixed_partition_;
920         } else {
921           partition = c->t1.IsReal() ? c->t1.partition() : c->t2.partition();
922         }
923         sqlite::result::Long(context, static_cast<sqlite3_int64>(partition));
924         break;
925       }
926       PERFETTO_FALLTHROUGH;
927     }
928     default: {
929       const auto* locator =
930           state->global_index_to_column_locator.Find(static_cast<size_t>(N));
931       PERFETTO_CHECK(locator);
932       if (locator->defn == c->t1.definition()) {
933         c->t1.ReportSqliteResult(context, locator->col_index);
934       } else {
935         c->t2.ReportSqliteResult(context, locator->col_index);
936       }
937     }
938   }
939   return SQLITE_OK;
940 }
941 
Rowid(sqlite3_vtab_cursor *,sqlite_int64 *)942 int SpanJoinOperatorModule::Rowid(sqlite3_vtab_cursor*, sqlite_int64*) {
943   return SQLITE_ERROR;
944 }
945 
FindFunction(sqlite3_vtab *,int,const char * name,FindFunctionFn ** fn,void **)946 int SpanJoinOperatorModule::FindFunction(sqlite3_vtab*,
947                                          int,
948                                          const char* name,
949                                          FindFunctionFn** fn,
950                                          void**) {
951   if (base::CaseInsensitiveEqual(name, "source_geq")) {
952     *fn = [](sqlite3_context* ctx, int, sqlite3_value**) {
953       return sqlite::result::Error(ctx, "Should not be called.");
954     };
955     return kSourceGeqOpCode;
956   }
957   return 0;
958 }
959 
IsOverlappingSpan() const960 bool SpanJoinOperatorModule::Cursor::IsOverlappingSpan() const {
961   // If either of the tables are eof, then we cannot possibly have an
962   // overlapping span.
963   if (t1.IsEof() || t2.IsEof())
964     return false;
965 
966   // One of the tables always needs to have a real span to have a valid
967   // overlapping span.
968   if (!t1.IsReal() && !t2.IsReal())
969     return false;
970 
971   using PartitioningType = PartitioningType;
972   if (state->partitioning == PartitioningType::kSamePartitioning) {
973     // If both tables are partitioned, then ensure that the partitions overlap.
974     bool partition_in_bounds = (t1.FirstPartition() >= t2.FirstPartition() &&
975                                 t1.FirstPartition() <= t2.LastPartition()) ||
976                                (t2.FirstPartition() >= t1.FirstPartition() &&
977                                 t2.FirstPartition() <= t1.LastPartition());
978     if (!partition_in_bounds)
979       return false;
980   }
981 
982   // We consider all slices to be [start, end) - that is the range of
983   // timestamps has an open interval at the start but a closed interval
984   // at the end. (with the exception of dur == -1 which we treat as if
985   // end == start for the purpose of this function).
986   return (t1.ts() == t2.ts() && t1.IsReal() && t2.IsReal()) ||
987          (t1.ts() >= t2.ts() && t1.ts() < t2.AdjustedTsEnd()) ||
988          (t2.ts() >= t1.ts() && t2.ts() < t1.AdjustedTsEnd());
989 }
990 
FindOverlappingSpan()991 base::Status SpanJoinOperatorModule::Cursor::FindOverlappingSpan() {
992   // We loop until we find a slice which overlaps from the two tables.
993   while (true) {
994     if (state->partitioning == PartitioningType::kMixedPartitioning) {
995       // If we have a mixed partition setup, we need to have special checks
996       // for eof and to reset the unpartitioned cursor every time the partition
997       // changes in the partitioned table.
998       auto* partitioned = t1.definition()->IsPartitioned() ? &t1 : &t2;
999       auto* unpartitioned = t1.definition()->IsPartitioned() ? &t2 : &t1;
1000 
1001       // If the partitioned table reaches eof, then we are really done.
1002       if (partitioned->IsEof())
1003         break;
1004 
1005       // If the partition has changed from the previous one, reset the cursor
1006       // and keep a lot of the new partition.
1007       if (last_mixed_partition_ != partitioned->partition()) {
1008         base::Status status = unpartitioned->Rewind();
1009         if (!status.ok())
1010           return status;
1011         last_mixed_partition_ = partitioned->partition();
1012       }
1013     } else if (t1.IsEof() || t2.IsEof()) {
1014       // For both no partition and same partition cases, either cursor ending
1015       // ends the whole span join.
1016       break;
1017     }
1018 
1019     // Find which slice finishes first.
1020     next_query = FindEarliestFinishQuery();
1021 
1022     // If the current span is overlapping, just finish there to emit the current
1023     // slice.
1024     if (IsOverlappingSpan())
1025       break;
1026 
1027     // Otherwise, step to the next row.
1028     base::Status status = next_query->Next();
1029     if (!status.ok())
1030       return status;
1031   }
1032   return base::OkStatus();
1033 }
1034 
1035 SpanJoinOperatorModule::Query*
FindEarliestFinishQuery()1036 SpanJoinOperatorModule::Cursor::FindEarliestFinishQuery() {
1037   int64_t t1_part;
1038   int64_t t2_part;
1039 
1040   switch (state->partitioning) {
1041     case PartitioningType::kMixedPartitioning: {
1042       // If either table is EOF, forward the other table to try and make
1043       // the partitions not match anymore.
1044       if (t1.IsEof())
1045         return &t2;
1046       if (t2.IsEof())
1047         return &t1;
1048 
1049       // Otherwise, just make the partition equal from both tables.
1050       t1_part = last_mixed_partition_;
1051       t2_part = last_mixed_partition_;
1052       break;
1053     }
1054     case PartitioningType::kSamePartitioning: {
1055       // Get the partition values from the cursor.
1056       t1_part = t1.LastPartition();
1057       t2_part = t2.LastPartition();
1058       break;
1059     }
1060     case PartitioningType::kNoPartitioning: {
1061       t1_part = 0;
1062       t2_part = 0;
1063       break;
1064     }
1065   }
1066 
1067   // Prefer to forward the earliest cursors based on the following
1068   // lexiographical ordering:
1069   // 1. partition
1070   // 2. end timestamp
1071   // 3. whether the slice is real or shadow (shadow < real)
1072   bool t1_less = std::make_tuple(t1_part, t1.AdjustedTsEnd(), t1.IsReal()) <
1073                  std::make_tuple(t2_part, t2.AdjustedTsEnd(), t2.IsReal());
1074   return t1_less ? &t1 : &t2;
1075 }
1076 
1077 }  // namespace perfetto::trace_processor
1078