xref: /aosp_15_r20/external/perfetto/ui/src/trace_processor/query_result.ts (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1// Copyright (C) 2021 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This file deals with deserialization and iteration of the proto-encoded
16// byte buffer that is returned by TraceProcessor when invoking the
17// TPM_QUERY_STREAMING method. The returned |query_result| buffer is optimized
18// for being moved cheaply across workers and decoded on-the-flight as we step
19// through the iterator.
20// See comments around QueryResult in trace_processor.proto for more details.
21
22// The classes in this file are organized as follows:
23//
24// QueryResultImpl:
25// The object returned by the Engine.query(sql) method.
26// This object is a holder of row data. Batches of raw get appended
27// incrementally as they are received by the remote TraceProcessor instance.
28// QueryResultImpl also deals with asynchronicity of queries and allows callers
29// to obtain a promise that waits for more (or all) rows.
30// At any point in time the following objects hold a reference to QueryResult:
31// - The Engine: for appending row batches.
32// - UI code, typically controllers, who make queries.
33//
34// ResultBatch:
35// Hold the data, returned by the remote TraceProcessor instance, for a number
36// of rows (TP typically chunks the results in batches of 128KB).
37// A QueryResultImpl holds exclusively ResultBatches for a given query.
38// ResultBatch is not exposed externally, it's just an internal representation
39// that helps with proto decoding. ResultBatch is immutable after it gets
40// appended and decoded. The iteration state is held by the RowIteratorImpl.
41//
42// RowIteratorImpl:
43// Decouples the data owned by QueryResultImpl (and its ResultBatch(es)) from
44// the iteration state. The iterator effectively is the union of a ResultBatch
45// and the row number in it. Rows within the batch are decoded as the user calls
46// next(). When getting at the end of the batch, it takes care of switching to
47// the next batch (if any) within the QueryResultImpl.
48// This object is part of the API exposed to tracks / controllers.
49
50// Ensure protobuf is initialized.
51import '../base/static_initializers';
52import protobuf from 'protobufjs/minimal';
53import {defer, Deferred} from '../base/deferred';
54import {assertExists, assertFalse, assertTrue} from '../base/logging';
55import {utf8Decode} from '../base/string_utils';
56import {Duration, duration, Time, time} from '../base/time';
57
58export type SqlValue = string | number | bigint | null | Uint8Array;
59// TODO(altimin): Replace ColumnType with SqlValue across the codebase and
60// remove export here.
61export type ColumnType = SqlValue;
62
63export const UNKNOWN: ColumnType = null;
64export const NUM = 0;
65export const STR = 'str';
66export const NUM_NULL: number | null = 1;
67export const STR_NULL: string | null = 'str_null';
68export const BLOB: Uint8Array = new Uint8Array();
69export const BLOB_NULL: Uint8Array | null = new Uint8Array();
70export const LONG: bigint = 0n;
71export const LONG_NULL: bigint | null = 1n;
72
73const SHIFT_32BITS = 32n;
74
75// Fast decode varint int64 into a bigint
76// Inspired by
77// https://github.com/protobufjs/protobuf.js/blob/56b1e64979dae757b67a21d326e16acee39f2267/src/reader.js#L123
78export function decodeInt64Varint(buf: Uint8Array, pos: number): bigint {
79  let hi: number = 0;
80  let lo: number = 0;
81  let i = 0;
82
83  if (buf.length - pos > 4) {
84    // fast route (lo)
85    for (; i < 4; ++i) {
86      // 1st..4th
87      lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0;
88      if (buf[pos++] < 128) {
89        return BigInt(lo);
90      }
91    }
92    // 5th
93    lo = (lo | ((buf[pos] & 127) << 28)) >>> 0;
94    hi = (hi | ((buf[pos] & 127) >> 4)) >>> 0;
95    if (buf[pos++] < 128) {
96      return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo);
97    }
98    i = 0;
99  } else {
100    for (; i < 3; ++i) {
101      if (pos >= buf.length) {
102        throw Error('Index out of range');
103      }
104      // 1st..3rd
105      lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0;
106      if (buf[pos++] < 128) {
107        return BigInt(lo);
108      }
109    }
110    // 4th
111    lo = (lo | ((buf[pos++] & 127) << (i * 7))) >>> 0;
112    return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo);
113  }
114  if (buf.length - pos > 4) {
115    // fast route (hi)
116    for (; i < 5; ++i) {
117      // 6th..10th
118      hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0;
119      if (buf[pos++] < 128) {
120        const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo);
121        return BigInt.asIntN(64, big);
122      }
123    }
124  } else {
125    for (; i < 5; ++i) {
126      if (pos >= buf.length) {
127        throw Error('Index out of range');
128      }
129      // 6th..10th
130      hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0;
131      if (buf[pos++] < 128) {
132        const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo);
133        return BigInt.asIntN(64, big);
134      }
135    }
136  }
137  throw Error('invalid varint encoding');
138}
139
140// Info that could help debug a query error. For example the query
141// in question, the stack where the query was issued, the active
142// plugin etc.
143export interface QueryErrorInfo {
144  query: string;
145}
146
147export class QueryError extends Error {
148  readonly query: string;
149
150  constructor(message: string, info: QueryErrorInfo) {
151    super(message);
152    this.query = info.query;
153  }
154
155  toString() {
156    return `${super.toString()}\nQuery:\n${this.query}`;
157  }
158}
159
160// One row extracted from an SQL result:
161export interface Row {
162  [key: string]: ColumnType;
163}
164
165// The methods that any iterator has to implement.
166export interface RowIteratorBase {
167  valid(): boolean;
168  next(): void;
169
170  // Reflection support for cases where the column names are not known upfront
171  // (e.g. the query result table for user-provided SQL queries).
172  // It throws if the passed column name doesn't exist.
173  // Example usage:
174  // for (const it = queryResult.iter({}); it.valid(); it.next()) {
175  //   for (const columnName : queryResult.columns()) {
176  //      console.log(it.get(columnName));
177  get(columnName: string): ColumnType;
178}
179
180// A RowIterator is a type that has all the fields defined in the query spec
181// plus the valid() and next() operators. This is to ultimately allow the
182// clients to do:
183// const result = await engine.query("select name, surname, id from people;");
184// const iter = queryResult.iter({name: STR, surname: STR, id: NUM});
185// for (; iter.valid(); iter.next())
186//  console.log(iter.name, iter.surname);
187export type RowIterator<T extends Row> = RowIteratorBase & T;
188
189function columnTypeToString(t: ColumnType): string {
190  switch (t) {
191    case NUM:
192      return 'NUM';
193    case NUM_NULL:
194      return 'NUM_NULL';
195    case STR:
196      return 'STR';
197    case STR_NULL:
198      return 'STR_NULL';
199    case BLOB:
200      return 'BLOB';
201    case BLOB_NULL:
202      return 'BLOB_NULL';
203    case LONG:
204      return 'LONG';
205    case LONG_NULL:
206      return 'LONG_NULL';
207    case UNKNOWN:
208      return 'UNKNOWN';
209    default:
210      return `INVALID(${t})`;
211  }
212}
213
214function isCompatible(actual: CellType, expected: ColumnType): boolean {
215  switch (actual) {
216    case CellType.CELL_NULL:
217      return (
218        expected === NUM_NULL ||
219        expected === STR_NULL ||
220        expected === BLOB_NULL ||
221        expected === LONG_NULL ||
222        expected === UNKNOWN
223      );
224    case CellType.CELL_VARINT:
225      return (
226        expected === NUM ||
227        expected === NUM_NULL ||
228        expected === LONG ||
229        expected === LONG_NULL ||
230        expected === UNKNOWN
231      );
232    case CellType.CELL_FLOAT64:
233      return expected === NUM || expected === NUM_NULL || expected === UNKNOWN;
234    case CellType.CELL_STRING:
235      return expected === STR || expected === STR_NULL || expected === UNKNOWN;
236    case CellType.CELL_BLOB:
237      return (
238        expected === BLOB || expected === BLOB_NULL || expected === UNKNOWN
239      );
240    default:
241      throw new Error(`Unknown CellType ${actual}`);
242  }
243}
244
245// This has to match CellType in trace_processor.proto.
246enum CellType {
247  CELL_NULL = 1,
248  CELL_VARINT = 2,
249  CELL_FLOAT64 = 3,
250  CELL_STRING = 4,
251  CELL_BLOB = 5,
252}
253
254const CELL_TYPE_NAMES = [
255  'UNKNOWN',
256  'NULL',
257  'VARINT',
258  'FLOAT64',
259  'STRING',
260  'BLOB',
261];
262
263const TAG_LEN_DELIM = 2;
264
265// This is the interface exposed to readers (e.g. tracks). The underlying object
266// (QueryResultImpl) owns the result data. This allows to obtain iterators on
267// that. In future it will allow to wait for incremental updates (new rows being
268// fetched) for streaming queries.
269export interface QueryResult {
270  // Obtains an iterator.
271  // TODO(primiano): this should have an option to destruct data as we read. In
272  // the case of a long query (e.g. `SELECT * FROM sched` in the query prompt)
273  // we don't want to accumulate everything in memory. OTOH UI tracks want to
274  // keep the data around so they can redraw them on each animation frame. For
275  // now we keep everything in memory in the QueryResultImpl object.
276  // iter<T extends Row>(spec: T): RowIterator<T>;
277  iter<T extends Row>(spec: T): RowIterator<T>;
278
279  // Like iter() for queries that expect only one row. It embeds the valid()
280  // check (i.e. throws if no rows are available) and returns directly the
281  // first result.
282  firstRow<T extends Row>(spec: T): T;
283
284  // Like firstRow() but returns undefined if no rows are available.
285  maybeFirstRow<T extends Row>(spec: T): T | undefined;
286
287  // If != undefined the query errored out and error() contains the message.
288  error(): string | undefined;
289
290  // Returns the number of rows accumulated so far. Note that this number can
291  // change over time as more batches are received. It becomes stable only
292  // when isComplete() returns true or after waitAllRows() is resolved.
293  numRows(): number;
294
295  // If true all rows have been fetched. Calling iter() will iterate through the
296  // last row. If false, iter() will return an iterator which might iterate
297  // through some rows (or none) but will surely not reach the end.
298  isComplete(): boolean;
299
300  // Returns a promise that is resolved only when all rows (i.e. all batches)
301  // have been fetched. The promise return value is always the object itself.
302  waitAllRows(): Promise<QueryResult>;
303
304  // Returns a promise that is resolved when either:
305  // - more rows are available
306  // - all rows are available
307  // The promise return value is always the object iself.
308  waitMoreRows(): Promise<QueryResult>;
309
310  // Can return an empty array if called before the first batch is resolved.
311  // This should be called only after having awaited for at least one batch.
312  columns(): string[];
313
314  // Returns the number of SQL statements in the query
315  // (e.g. 2 'if SELECT 1; SELECT 2;')
316  statementCount(): number;
317
318  // Returns the number of SQL statement that produced output rows. This number
319  // is <= statementCount().
320  statementWithOutputCount(): number;
321
322  // Returns the last SQL statement.
323  lastStatementSql(): string;
324}
325
326// Interface exposed to engine.ts to pump in the data as new row batches arrive.
327export interface WritableQueryResult extends QueryResult {
328  // |resBytes| is a proto-encoded trace_processor.QueryResult message.
329  //  The overall flow looks as follows:
330  // - The user calls engine.query('select ...') and gets a QueryResult back.
331  // - The query call posts a message to the worker that runs the SQL engine (
332  //   or sends a HTTP request in case of the RPC+HTTP interface).
333  // - The returned QueryResult object is initially empty.
334  // - Over time, the sql engine will postMessage() back results in batches.
335  // - Each bach will end up calling this appendResultBatch() method.
336  // - If there is any pending promise (e.g. the caller called
337  //   queryResult.waitAllRows()), this call will awake them (if this is the
338  //   last batch).
339  appendResultBatch(resBytes: Uint8Array): void;
340}
341
342// The actual implementation, which bridges together the reader side and the
343// writer side (the one exposed to the Engine). This is the same object so that
344// when the engine pumps new row batches we can resolve pending promises that
345// readers (e.g. track code) are waiting for.
346class QueryResultImpl implements QueryResult, WritableQueryResult {
347  columnNames: string[] = [];
348  private _error?: string;
349  private _numRows = 0;
350  private _isComplete = false;
351  private _errorInfo: QueryErrorInfo;
352  private _statementCount = 0;
353  private _statementWithOutputCount = 0;
354  private _lastStatementSql = '';
355
356  constructor(errorInfo: QueryErrorInfo) {
357    this._errorInfo = errorInfo;
358  }
359
360  // --- QueryResult implementation.
361
362  // TODO(primiano): for the moment new batches are appended but old batches
363  // are never removed. This won't work with abnormally large result sets, as
364  // it will stash all rows in memory. We could switch to a model where the
365  // iterator is destructive and deletes batch objects once iterating past the
366  // end of each batch. If we do that, than we need to assign monotonic IDs to
367  // batches. Also if we do that, we should prevent creating more than one
368  // iterator for a QueryResult.
369  batches: ResultBatch[] = [];
370
371  // Promise awaiting on waitAllRows(). This should be resolved only when the
372  // last result batch has been been retrieved.
373  private allRowsPromise?: Deferred<QueryResult>;
374
375  // Promise awaiting on waitMoreRows(). This resolved when the next
376  // batch is appended via appendResultBatch.
377  private moreRowsPromise?: Deferred<QueryResult>;
378
379  isComplete(): boolean {
380    return this._isComplete;
381  }
382  numRows(): number {
383    return this._numRows;
384  }
385  error(): string | undefined {
386    return this._error;
387  }
388  columns(): string[] {
389    return this.columnNames;
390  }
391  statementCount(): number {
392    return this._statementCount;
393  }
394  statementWithOutputCount(): number {
395    return this._statementWithOutputCount;
396  }
397  lastStatementSql(): string {
398    return this._lastStatementSql;
399  }
400
401  iter<T extends Row>(spec: T): RowIterator<T> {
402    const impl = new RowIteratorImplWithRowData(spec, this);
403    return impl as {} as RowIterator<T>;
404  }
405
406  firstRow<T extends Row>(spec: T): T {
407    const impl = new RowIteratorImplWithRowData(spec, this);
408    assertTrue(impl.valid());
409    return impl as {} as RowIterator<T> as T;
410  }
411
412  maybeFirstRow<T extends Row>(spec: T): T | undefined {
413    const impl = new RowIteratorImplWithRowData(spec, this);
414    if (!impl.valid()) {
415      return undefined;
416    }
417    return impl as {} as RowIterator<T> as T;
418  }
419
420  // Can be called only once.
421  waitAllRows(): Promise<QueryResult> {
422    assertTrue(this.allRowsPromise === undefined);
423    this.allRowsPromise = defer<QueryResult>();
424    if (this._isComplete) {
425      this.resolveOrReject(this.allRowsPromise, this);
426    }
427    return this.allRowsPromise;
428  }
429
430  waitMoreRows(): Promise<QueryResult> {
431    if (this.moreRowsPromise !== undefined) {
432      return this.moreRowsPromise;
433    }
434
435    const moreRowsPromise = defer<QueryResult>();
436    if (this._isComplete) {
437      this.resolveOrReject(moreRowsPromise, this);
438    } else {
439      this.moreRowsPromise = moreRowsPromise;
440    }
441    return moreRowsPromise;
442  }
443
444  // --- WritableQueryResult implementation.
445
446  // Called by the engine when a new QueryResult is available. Note that a
447  // single Query() call can yield >1 QueryResult due to result batching
448  // if more than ~64K of data are returned, e.g. when returning O(M) rows.
449  // |resBytes| is a proto-encoded trace_processor.QueryResult message.
450  // It is fine to retain the resBytes without slicing a copy, because
451  // ProtoRingBuffer does the slice() for us (or passes through the buffer
452  // coming from postMessage() (Wasm case) of fetch() (HTTP+RPC case).
453  appendResultBatch(resBytes: Uint8Array) {
454    const reader = protobuf.Reader.create(resBytes);
455    assertTrue(reader.pos === 0);
456    const columnNamesEmptyAtStartOfBatch = this.columnNames.length === 0;
457    const columnNamesSet = new Set<string>();
458    while (reader.pos < reader.len) {
459      const tag = reader.uint32();
460      switch (tag >>> 3) {
461        case 1: // column_names
462          // Only the first batch should contain the column names. If this fires
463          // something is going wrong in the handling of the batch stream.
464          assertTrue(columnNamesEmptyAtStartOfBatch);
465          const origColName = reader.string();
466          let colName = origColName;
467          // In some rare cases two columns can have the same name (b/194891824)
468          // e.g. `select 1 as x, 2 as x`. These queries don't happen in the
469          // UI code, but they can happen when the user types a query (e.g.
470          // with a join). The most practical thing we can do here is renaming
471          // the columns with a suffix. Keeping the same name will break when
472          // iterating, because column names become iterator object keys.
473          for (let i = 1; columnNamesSet.has(colName); ++i) {
474            colName = `${origColName}_${i}`;
475            assertTrue(i < 100); // Give up at some point;
476          }
477          columnNamesSet.add(colName);
478          this.columnNames.push(colName);
479          break;
480        case 2: // error
481          // The query has errored only if the |error| field is non-empty.
482          // In protos, we don't distinguish between non-present and empty.
483          // Make sure we don't propagate ambiguous empty strings to JS.
484          const err = reader.string();
485          this._error = err !== undefined && err.length ? err : undefined;
486          break;
487        case 3: // batch
488          const batchLen = reader.uint32();
489          const batchRaw = resBytes.subarray(reader.pos, reader.pos + batchLen);
490          reader.pos += batchLen;
491
492          // The ResultBatch ctor parses the CellsBatch submessage.
493          const parsedBatch = new ResultBatch(batchRaw);
494          this.batches.push(parsedBatch);
495          this._isComplete = parsedBatch.isLastBatch;
496
497          // In theory one could construct a valid proto serializing the column
498          // names after the cell batches. In practice the QueryResultSerializer
499          // doesn't do that so it's not worth complicating the code.
500          const numColumns = this.columnNames.length;
501          if (numColumns !== 0) {
502            assertTrue(parsedBatch.numCells % numColumns === 0);
503            this._numRows += parsedBatch.numCells / numColumns;
504          } else {
505            // numColumns == 0 is  plausible for queries like CREATE TABLE ... .
506            assertTrue(parsedBatch.numCells === 0);
507          }
508          break;
509
510        case 4:
511          this._statementCount = reader.uint32();
512          break;
513
514        case 5:
515          this._statementWithOutputCount = reader.uint32();
516          break;
517
518        case 6:
519          this._lastStatementSql = reader.string();
520          break;
521
522        default:
523          console.warn(`Unexpected QueryResult field ${tag >>> 3}`);
524          reader.skipType(tag & 7);
525          break;
526      } // switch (tag)
527    } // while (pos < end)
528
529    if (this.moreRowsPromise !== undefined) {
530      this.resolveOrReject(this.moreRowsPromise, this);
531      this.moreRowsPromise = undefined;
532    }
533
534    if (this._isComplete && this.allRowsPromise !== undefined) {
535      this.resolveOrReject(this.allRowsPromise, this);
536    }
537  }
538
539  ensureAllRowsPromise(): Promise<QueryResult> {
540    if (this.allRowsPromise === undefined) {
541      this.waitAllRows(); // Will populate |this.allRowsPromise|.
542    }
543    return assertExists(this.allRowsPromise);
544  }
545
546  get errorInfo(): QueryErrorInfo {
547    return this._errorInfo;
548  }
549
550  private resolveOrReject(promise: Deferred<QueryResult>, arg: QueryResult) {
551    if (this._error === undefined) {
552      promise.resolve(arg);
553    } else {
554      promise.reject(new QueryError(this._error, this._errorInfo));
555    }
556  }
557}
558
559// This class holds onto a received result batch (a Uint8Array) and does some
560// partial parsing to tokenize the various cell groups. This parsing mainly
561// consists of identifying and caching the offsets of each cell group and
562// initializing the varint decoders. This half parsing is done to keep the
563// iterator's next() fast, without decoding everything into memory.
564// This is an internal implementation detail and is not exposed outside. The
565// RowIteratorImpl uses this class to iterate through batches (this class takes
566// care of iterating within a batch, RowIteratorImpl takes care of switching
567// batches when needed).
568// Note: at any point in time there can be more than one ResultIterator
569// referencing the same batch. The batch must be immutable.
570class ResultBatch {
571  readonly isLastBatch: boolean = false;
572  readonly batchBytes: Uint8Array;
573  readonly cellTypesOff: number = 0;
574  readonly cellTypesLen: number = 0;
575  readonly varintOff: number = 0;
576  readonly varintLen: number = 0;
577  readonly float64Cells = new Float64Array();
578  readonly blobCells: Uint8Array[] = [];
579  readonly stringCells: string[] = [];
580
581  // batchBytes is a trace_processor.QueryResult.CellsBatch proto.
582  constructor(batchBytes: Uint8Array) {
583    this.batchBytes = batchBytes;
584    const reader = protobuf.Reader.create(batchBytes);
585    assertTrue(reader.pos === 0);
586    const end = reader.len;
587
588    // Here we deconstruct the proto by hand. The CellsBatch is carefully
589    // designed to allow a very fast parsing from the TS side. We pack all cells
590    // of the same types together, so we can do only one call (per batch) to
591    // TextDecoder.decode(), we can overlay a memory-aligned typedarray for
592    // float values and can quickly tell and type-check the cell types.
593    // One row = N cells (we know the number upfront from the outer message).
594    // Each bach contains always an integer multiple of N cells (i.e. rows are
595    // never fragmented across different batches).
596    while (reader.pos < end) {
597      const tag = reader.uint32();
598      switch (tag >>> 3) {
599        case 1: // cell types, a packed array containing one CellType per cell.
600          assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint.
601          this.cellTypesLen = reader.uint32();
602          this.cellTypesOff = reader.pos;
603          reader.pos += this.cellTypesLen;
604          break;
605
606        case 2: // varint_cells, a packed varint buffer.
607          assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint.
608          const packLen = reader.uint32();
609          this.varintOff = reader.pos;
610          this.varintLen = packLen;
611          assertTrue(reader.buf === batchBytes);
612          assertTrue(
613            this.varintOff + this.varintLen <=
614              batchBytes.byteOffset + batchBytes.byteLength,
615          );
616          reader.pos += packLen;
617          break;
618
619        case 3: // float64_cells, a 64-bit aligned packed fixed64 buffer.
620          assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint.
621          const f64Len = reader.uint32();
622          assertTrue(f64Len % 8 === 0);
623          // Float64Array's constructor is evil: the offset is in bytes but the
624          // length is in 8-byte words.
625          const f64Words = f64Len / 8;
626          const f64Off = batchBytes.byteOffset + reader.pos;
627          if (f64Off % 8 === 0) {
628            this.float64Cells = new Float64Array(
629              batchBytes.buffer,
630              f64Off,
631              f64Words,
632            );
633          } else {
634            // When using the production code in trace_processor's rpc.cc, the
635            // float64 should be 8-bytes aligned. The slow-path case is only for
636            // tests.
637            const slice = batchBytes.buffer.slice(f64Off, f64Off + f64Len);
638            this.float64Cells = new Float64Array(slice);
639          }
640          reader.pos += f64Len;
641          break;
642
643        case 4: // blob_cells: one entry per blob.
644          assertTrue((tag & 7) === TAG_LEN_DELIM);
645          // protobufjs's bytes() under the hoods calls slice() and creates
646          // a copy. Fine here as blobs are rare and not a fastpath.
647          this.blobCells.push(new Uint8Array(reader.bytes()));
648          break;
649
650        case 5: // string_cells: all the string cells concatenated with \0s.
651          assertTrue((tag & 7) === TAG_LEN_DELIM);
652          const strLen = reader.uint32();
653          assertTrue(reader.pos + strLen <= end);
654          const subArr = batchBytes.subarray(reader.pos, reader.pos + strLen);
655          assertTrue(subArr.length === strLen);
656          // The reason why we do this split rather than creating one string
657          // per entry is that utf8 decoding has some non-negligible cost. See
658          // go/postmessage-benchmark .
659          this.stringCells = utf8Decode(subArr).split('\0');
660          reader.pos += strLen;
661          break;
662
663        case 6: // is_last_batch (boolean).
664          this.isLastBatch = !!reader.bool();
665          break;
666
667        case 7: // padding for realignment, skip silently.
668          reader.skipType(tag & 7);
669          break;
670
671        default:
672          console.warn(`Unexpected QueryResult.CellsBatch field ${tag >>> 3}`);
673          reader.skipType(tag & 7);
674          break;
675      } // switch(tag)
676    } // while (pos < end)
677  }
678
679  get numCells() {
680    return this.cellTypesLen;
681  }
682}
683
684class RowIteratorImpl implements RowIteratorBase {
685  // The spec passed to the iter call containing the expected types, e.g.:
686  // {'colA': NUM, 'colB': NUM_NULL, 'colC': STRING}.
687  // This doesn't ever change.
688  readonly rowSpec: Row;
689
690  // The object that holds the current row. This points to the parent
691  // RowIteratorImplWithRowData instance that created this class.
692  rowData: Row;
693
694  // The QueryResult object we are reading data from. The engine will pump
695  // batches over time into this object.
696  private resultObj: QueryResultImpl;
697
698  // All the member variables in the group below point to the identically-named
699  // members in result.batch[batchIdx]. This is to avoid indirection layers in
700  // the next() hotpath, so we can do this.float64Cells vs
701  // this.resultObj.batch[this.batchIdx].float64Cells.
702  // These are re-set every time tryMoveToNextBatch() is called (and succeeds).
703  private batchIdx = -1; // The batch index within |result.batches[]|.
704  private batchBytes = new Uint8Array();
705  private columnNames: string[] = [];
706  private numColumns = 0;
707  private cellTypesEnd = -1; // -1 so the 1st next() hits tryMoveToNextBatch().
708  private float64Cells = new Float64Array();
709  private varIntReader = protobuf.Reader.create(this.batchBytes);
710  private blobCells: Uint8Array[] = [];
711  private stringCells: string[] = [];
712
713  // These members instead are incremented as we read cells from next(). They
714  // are the mutable state of the iterator.
715  private nextCellTypeOff = 0;
716  private nextFloat64Cell = 0;
717  private nextStringCell = 0;
718  private nextBlobCell = 0;
719  private isValid = false;
720
721  constructor(querySpec: Row, rowData: Row, res: QueryResultImpl) {
722    Object.assign(this, querySpec);
723    this.rowData = rowData;
724    this.rowSpec = {...querySpec}; // ... -> Copy all the key/value pairs.
725    this.resultObj = res;
726    this.next();
727  }
728
729  valid(): boolean {
730    return this.isValid;
731  }
732
733  private makeError(message: string): QueryError {
734    return new QueryError(message, this.resultObj.errorInfo);
735  }
736
737  get(columnName: string): ColumnType {
738    const res = this.rowData[columnName];
739    if (res === undefined) {
740      throw this.makeError(
741        `Column '${columnName}' doesn't exist. ` +
742          `Actual columns: [${this.columnNames.join(',')}]`,
743      );
744    }
745    return res;
746  }
747
748  // Moves the cursor next by one row and updates |isValid|.
749  // When this fails to move, two cases are possible:
750  // 1. We reached the end of the result set (this is the case if
751  //    QueryResult.isComplete() == true when this fails).
752  // 2. We reached the end of the current batch, but more rows might come later
753  //    (if QueryResult.isComplete() == false).
754  next() {
755    // At some point we might reach the end of the current batch, but the next
756    // batch might be available already. In this case we want next() to
757    // transparently move on to the next batch.
758    while (this.nextCellTypeOff + this.numColumns > this.cellTypesEnd) {
759      // If TraceProcessor is behaving well, we should never end up in a
760      // situation where we have leftover cells. TP is expected to serialize
761      // whole rows in each QueryResult batch and NOT truncate them midway.
762      // If this assert fires the TP RPC logic has a bug.
763      assertTrue(
764        this.nextCellTypeOff === this.cellTypesEnd || this.cellTypesEnd === -1,
765      );
766      if (!this.tryMoveToNextBatch()) {
767        this.isValid = false;
768        return;
769      }
770    }
771
772    const rowData = this.rowData;
773    const numColumns = this.numColumns;
774
775    // Read the current row.
776    for (let i = 0; i < numColumns; i++) {
777      const cellType = this.batchBytes[this.nextCellTypeOff++];
778      const colName = this.columnNames[i];
779      const expType = this.rowSpec[colName];
780
781      switch (cellType) {
782        case CellType.CELL_NULL:
783          rowData[colName] = null;
784          break;
785
786        case CellType.CELL_VARINT:
787          if (expType === NUM || expType === NUM_NULL) {
788            // This is very subtle. The return type of int64 can be either a
789            // number or a Long.js {high:number, low:number} if Long.js is
790            // installed. The default state seems different in node and browser.
791            // We force-disable Long.js support in the top of this source file.
792            const val = this.varIntReader.int64();
793            rowData[colName] = val as {} as number;
794          } else {
795            // LONG, LONG_NULL, or unspecified - return as bigint
796            const value = decodeInt64Varint(
797              this.batchBytes,
798              this.varIntReader.pos,
799            );
800            rowData[colName] = value;
801            this.varIntReader.skip(); // Skips a varint
802          }
803          break;
804
805        case CellType.CELL_FLOAT64:
806          rowData[colName] = this.float64Cells[this.nextFloat64Cell++];
807          break;
808
809        case CellType.CELL_STRING:
810          rowData[colName] = this.stringCells[this.nextStringCell++];
811          break;
812
813        case CellType.CELL_BLOB:
814          const blob = this.blobCells[this.nextBlobCell++];
815          rowData[colName] = blob;
816          break;
817
818        default:
819          throw this.makeError(`Invalid cell type ${cellType}`);
820      }
821    } // For (cells)
822    this.isValid = true;
823  }
824
825  private tryMoveToNextBatch(): boolean {
826    const nextBatchIdx = this.batchIdx + 1;
827    if (nextBatchIdx >= this.resultObj.batches.length) {
828      return false;
829    }
830
831    this.columnNames = this.resultObj.columnNames;
832    this.numColumns = this.columnNames.length;
833
834    this.batchIdx = nextBatchIdx;
835    const batch = assertExists(this.resultObj.batches[nextBatchIdx]);
836    this.batchBytes = batch.batchBytes;
837    this.nextCellTypeOff = batch.cellTypesOff;
838    this.cellTypesEnd = batch.cellTypesOff + batch.cellTypesLen;
839    this.float64Cells = batch.float64Cells;
840    this.blobCells = batch.blobCells;
841    this.stringCells = batch.stringCells;
842    this.varIntReader = protobuf.Reader.create(batch.batchBytes);
843    this.varIntReader.pos = batch.varintOff;
844    this.varIntReader.len = batch.varintOff + batch.varintLen;
845    this.nextFloat64Cell = 0;
846    this.nextStringCell = 0;
847    this.nextBlobCell = 0;
848
849    // Check that all the expected columns are present.
850    for (const expectedCol of Object.keys(this.rowSpec)) {
851      if (this.columnNames.indexOf(expectedCol) < 0) {
852        throw this.makeError(
853          `Column ${expectedCol} not found in the SQL result ` +
854            `set {${this.columnNames.join(' ')}}`,
855        );
856      }
857    }
858
859    // Check that the cells types are consistent.
860    const numColumns = this.numColumns;
861    if (batch.numCells === 0) {
862      // This can happen if the query result contains just an error. In this
863      // an empty batch with isLastBatch=true is appended as an EOF marker.
864      // In theory TraceProcessor could return an empty batch in the middle and
865      // that would be fine from a protocol viewpoint. In practice, no code path
866      // does that today so it doesn't make sense trying supporting it with a
867      // recursive call to tryMoveToNextBatch().
868      assertTrue(batch.isLastBatch);
869      return false;
870    }
871
872    assertTrue(numColumns > 0);
873    for (let i = this.nextCellTypeOff; i < this.cellTypesEnd; i++) {
874      const col = (i - this.nextCellTypeOff) % numColumns;
875      const colName = this.columnNames[col];
876      const actualType = this.batchBytes[i] as CellType;
877      const expType = this.rowSpec[colName];
878
879      // If undefined, the caller doesn't want to read this column at all, so
880      // it can be whatever.
881      if (expType === undefined) continue;
882
883      let err = '';
884      if (!isCompatible(actualType, expType)) {
885        if (actualType === CellType.CELL_NULL) {
886          err =
887            'SQL value is NULL but that was not expected' +
888            ` (expected type: ${columnTypeToString(expType)}). ` +
889            'Did you mean NUM_NULL, LONG_NULL, STR_NULL or BLOB_NULL?';
890        } else {
891          err = `Incompatible cell type. Expected: ${columnTypeToString(
892            expType,
893          )} actual: ${CELL_TYPE_NAMES[actualType]}`;
894        }
895      }
896      if (err.length > 0) {
897        const row = Math.floor(i / numColumns);
898        const message = `Error @ row: ${row} col: '${colName}': ${err}`;
899        throw this.makeError(message);
900      }
901    }
902    return true;
903  }
904}
905
906// This is the object ultimately returned to the client when calling
907// QueryResult.iter(...).
908// The only reason why this is disjoint from RowIteratorImpl is to avoid
909// naming collisions between the members variables required by RowIteratorImpl
910// and the column names returned by the iterator.
911class RowIteratorImplWithRowData implements RowIteratorBase {
912  private _impl: RowIteratorImpl;
913
914  next: () => void;
915  valid: () => boolean;
916  get: (columnName: string) => ColumnType;
917
918  constructor(querySpec: Row, res: QueryResultImpl) {
919    const thisAsRow = this as {} as Row;
920    Object.assign(thisAsRow, querySpec);
921    this._impl = new RowIteratorImpl(querySpec, thisAsRow, res);
922    this.next = this._impl.next.bind(this._impl);
923    this.valid = this._impl.valid.bind(this._impl);
924    this.get = this._impl.get.bind(this._impl);
925  }
926}
927
928// This is a proxy object that wraps QueryResultImpl, adding await-ability.
929// This is so that:
930// 1. Clients that just want to await for the full result set can just call
931//    await engine.query('...') and will get a QueryResult that is guaranteed
932//    to be complete.
933// 2. Clients that know how to handle the streaming can use it straight away.
934class WaitableQueryResultImpl
935  implements QueryResult, WritableQueryResult, PromiseLike<QueryResult>
936{
937  private impl: QueryResultImpl;
938  private thenCalled = false;
939
940  constructor(errorInfo: QueryErrorInfo) {
941    this.impl = new QueryResultImpl(errorInfo);
942  }
943
944  // QueryResult implementation. Proxies all calls to the impl object.
945  iter<T extends Row>(spec: T) {
946    return this.impl.iter(spec);
947  }
948  firstRow<T extends Row>(spec: T) {
949    return this.impl.firstRow(spec);
950  }
951  maybeFirstRow<T extends Row>(spec: T) {
952    return this.impl.maybeFirstRow(spec);
953  }
954  waitAllRows() {
955    return this.impl.waitAllRows();
956  }
957  waitMoreRows() {
958    return this.impl.waitMoreRows();
959  }
960  isComplete() {
961    return this.impl.isComplete();
962  }
963  numRows() {
964    return this.impl.numRows();
965  }
966  columns() {
967    return this.impl.columns();
968  }
969  error() {
970    return this.impl.error();
971  }
972  statementCount() {
973    return this.impl.statementCount();
974  }
975  statementWithOutputCount() {
976    return this.impl.statementWithOutputCount();
977  }
978  lastStatementSql() {
979    return this.impl.lastStatementSql();
980  }
981
982  // WritableQueryResult implementation.
983  appendResultBatch(resBytes: Uint8Array) {
984    return this.impl.appendResultBatch(resBytes);
985  }
986
987  // PromiseLike<QueryResult> implementaton.
988
989  // eslint-disable-next-line @typescript-eslint/no-explicit-any
990  then(onfulfilled: any, onrejected: any): any {
991    assertFalse(this.thenCalled);
992    this.thenCalled = true;
993    return this.impl.ensureAllRowsPromise().then(onfulfilled, onrejected);
994  }
995
996  // eslint-disable-next-line @typescript-eslint/no-explicit-any
997  catch(error: any): any {
998    return this.impl.ensureAllRowsPromise().catch(error);
999  }
1000
1001  // eslint-disable-next-line @typescript-eslint/no-explicit-any
1002  finally(callback: () => void): any {
1003    return this.impl.ensureAllRowsPromise().finally(callback);
1004  }
1005
1006  // eslint and clang-format disagree on how to format get[foo](). Let
1007  // clang-format win:
1008  get [Symbol.toStringTag](): string {
1009    return 'Promise<WaitableQueryResult>';
1010  }
1011}
1012
1013export function createQueryResult(
1014  errorInfo: QueryErrorInfo,
1015): QueryResult & Promise<QueryResult> & WritableQueryResult {
1016  return new WaitableQueryResultImpl(errorInfo);
1017}
1018
1019// Throws if the value cannot be reasonably converted to a bigint.
1020// Assumes value is in native time units.
1021export function timeFromSql(value: ColumnType): time {
1022  if (typeof value === 'bigint') {
1023    return Time.fromRaw(value);
1024  } else if (typeof value === 'number') {
1025    return Time.fromRaw(BigInt(Math.floor(value)));
1026  } else if (value === null) {
1027    return Time.ZERO;
1028  } else {
1029    throw Error(`Refusing to create time from unrelated type ${value}`);
1030  }
1031}
1032
1033// Throws if the value cannot be reasonably converted to a bigint.
1034// Assumes value is in nanoseconds.
1035export function durationFromSql(value: ColumnType): duration {
1036  if (typeof value === 'bigint') {
1037    return value;
1038  } else if (typeof value === 'number') {
1039    return BigInt(Math.floor(value));
1040  } else if (value === null) {
1041    return Duration.ZERO;
1042  } else {
1043    throw Error(`Refusing to create duration from unrelated type ${value}`);
1044  }
1045}
1046