1// Copyright (C) 2021 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// This file deals with deserialization and iteration of the proto-encoded 16// byte buffer that is returned by TraceProcessor when invoking the 17// TPM_QUERY_STREAMING method. The returned |query_result| buffer is optimized 18// for being moved cheaply across workers and decoded on-the-flight as we step 19// through the iterator. 20// See comments around QueryResult in trace_processor.proto for more details. 21 22// The classes in this file are organized as follows: 23// 24// QueryResultImpl: 25// The object returned by the Engine.query(sql) method. 26// This object is a holder of row data. Batches of raw get appended 27// incrementally as they are received by the remote TraceProcessor instance. 28// QueryResultImpl also deals with asynchronicity of queries and allows callers 29// to obtain a promise that waits for more (or all) rows. 30// At any point in time the following objects hold a reference to QueryResult: 31// - The Engine: for appending row batches. 32// - UI code, typically controllers, who make queries. 33// 34// ResultBatch: 35// Hold the data, returned by the remote TraceProcessor instance, for a number 36// of rows (TP typically chunks the results in batches of 128KB). 37// A QueryResultImpl holds exclusively ResultBatches for a given query. 38// ResultBatch is not exposed externally, it's just an internal representation 39// that helps with proto decoding. ResultBatch is immutable after it gets 40// appended and decoded. The iteration state is held by the RowIteratorImpl. 41// 42// RowIteratorImpl: 43// Decouples the data owned by QueryResultImpl (and its ResultBatch(es)) from 44// the iteration state. The iterator effectively is the union of a ResultBatch 45// and the row number in it. Rows within the batch are decoded as the user calls 46// next(). When getting at the end of the batch, it takes care of switching to 47// the next batch (if any) within the QueryResultImpl. 48// This object is part of the API exposed to tracks / controllers. 49 50// Ensure protobuf is initialized. 51import '../base/static_initializers'; 52import protobuf from 'protobufjs/minimal'; 53import {defer, Deferred} from '../base/deferred'; 54import {assertExists, assertFalse, assertTrue} from '../base/logging'; 55import {utf8Decode} from '../base/string_utils'; 56import {Duration, duration, Time, time} from '../base/time'; 57 58export type SqlValue = string | number | bigint | null | Uint8Array; 59// TODO(altimin): Replace ColumnType with SqlValue across the codebase and 60// remove export here. 61export type ColumnType = SqlValue; 62 63export const UNKNOWN: ColumnType = null; 64export const NUM = 0; 65export const STR = 'str'; 66export const NUM_NULL: number | null = 1; 67export const STR_NULL: string | null = 'str_null'; 68export const BLOB: Uint8Array = new Uint8Array(); 69export const BLOB_NULL: Uint8Array | null = new Uint8Array(); 70export const LONG: bigint = 0n; 71export const LONG_NULL: bigint | null = 1n; 72 73const SHIFT_32BITS = 32n; 74 75// Fast decode varint int64 into a bigint 76// Inspired by 77// https://github.com/protobufjs/protobuf.js/blob/56b1e64979dae757b67a21d326e16acee39f2267/src/reader.js#L123 78export function decodeInt64Varint(buf: Uint8Array, pos: number): bigint { 79 let hi: number = 0; 80 let lo: number = 0; 81 let i = 0; 82 83 if (buf.length - pos > 4) { 84 // fast route (lo) 85 for (; i < 4; ++i) { 86 // 1st..4th 87 lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0; 88 if (buf[pos++] < 128) { 89 return BigInt(lo); 90 } 91 } 92 // 5th 93 lo = (lo | ((buf[pos] & 127) << 28)) >>> 0; 94 hi = (hi | ((buf[pos] & 127) >> 4)) >>> 0; 95 if (buf[pos++] < 128) { 96 return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); 97 } 98 i = 0; 99 } else { 100 for (; i < 3; ++i) { 101 if (pos >= buf.length) { 102 throw Error('Index out of range'); 103 } 104 // 1st..3rd 105 lo = (lo | ((buf[pos] & 127) << (i * 7))) >>> 0; 106 if (buf[pos++] < 128) { 107 return BigInt(lo); 108 } 109 } 110 // 4th 111 lo = (lo | ((buf[pos++] & 127) << (i * 7))) >>> 0; 112 return (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); 113 } 114 if (buf.length - pos > 4) { 115 // fast route (hi) 116 for (; i < 5; ++i) { 117 // 6th..10th 118 hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0; 119 if (buf[pos++] < 128) { 120 const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); 121 return BigInt.asIntN(64, big); 122 } 123 } 124 } else { 125 for (; i < 5; ++i) { 126 if (pos >= buf.length) { 127 throw Error('Index out of range'); 128 } 129 // 6th..10th 130 hi = (hi | ((buf[pos] & 127) << (i * 7 + 3))) >>> 0; 131 if (buf[pos++] < 128) { 132 const big = (BigInt(hi) << SHIFT_32BITS) | BigInt(lo); 133 return BigInt.asIntN(64, big); 134 } 135 } 136 } 137 throw Error('invalid varint encoding'); 138} 139 140// Info that could help debug a query error. For example the query 141// in question, the stack where the query was issued, the active 142// plugin etc. 143export interface QueryErrorInfo { 144 query: string; 145} 146 147export class QueryError extends Error { 148 readonly query: string; 149 150 constructor(message: string, info: QueryErrorInfo) { 151 super(message); 152 this.query = info.query; 153 } 154 155 toString() { 156 return `${super.toString()}\nQuery:\n${this.query}`; 157 } 158} 159 160// One row extracted from an SQL result: 161export interface Row { 162 [key: string]: ColumnType; 163} 164 165// The methods that any iterator has to implement. 166export interface RowIteratorBase { 167 valid(): boolean; 168 next(): void; 169 170 // Reflection support for cases where the column names are not known upfront 171 // (e.g. the query result table for user-provided SQL queries). 172 // It throws if the passed column name doesn't exist. 173 // Example usage: 174 // for (const it = queryResult.iter({}); it.valid(); it.next()) { 175 // for (const columnName : queryResult.columns()) { 176 // console.log(it.get(columnName)); 177 get(columnName: string): ColumnType; 178} 179 180// A RowIterator is a type that has all the fields defined in the query spec 181// plus the valid() and next() operators. This is to ultimately allow the 182// clients to do: 183// const result = await engine.query("select name, surname, id from people;"); 184// const iter = queryResult.iter({name: STR, surname: STR, id: NUM}); 185// for (; iter.valid(); iter.next()) 186// console.log(iter.name, iter.surname); 187export type RowIterator<T extends Row> = RowIteratorBase & T; 188 189function columnTypeToString(t: ColumnType): string { 190 switch (t) { 191 case NUM: 192 return 'NUM'; 193 case NUM_NULL: 194 return 'NUM_NULL'; 195 case STR: 196 return 'STR'; 197 case STR_NULL: 198 return 'STR_NULL'; 199 case BLOB: 200 return 'BLOB'; 201 case BLOB_NULL: 202 return 'BLOB_NULL'; 203 case LONG: 204 return 'LONG'; 205 case LONG_NULL: 206 return 'LONG_NULL'; 207 case UNKNOWN: 208 return 'UNKNOWN'; 209 default: 210 return `INVALID(${t})`; 211 } 212} 213 214function isCompatible(actual: CellType, expected: ColumnType): boolean { 215 switch (actual) { 216 case CellType.CELL_NULL: 217 return ( 218 expected === NUM_NULL || 219 expected === STR_NULL || 220 expected === BLOB_NULL || 221 expected === LONG_NULL || 222 expected === UNKNOWN 223 ); 224 case CellType.CELL_VARINT: 225 return ( 226 expected === NUM || 227 expected === NUM_NULL || 228 expected === LONG || 229 expected === LONG_NULL || 230 expected === UNKNOWN 231 ); 232 case CellType.CELL_FLOAT64: 233 return expected === NUM || expected === NUM_NULL || expected === UNKNOWN; 234 case CellType.CELL_STRING: 235 return expected === STR || expected === STR_NULL || expected === UNKNOWN; 236 case CellType.CELL_BLOB: 237 return ( 238 expected === BLOB || expected === BLOB_NULL || expected === UNKNOWN 239 ); 240 default: 241 throw new Error(`Unknown CellType ${actual}`); 242 } 243} 244 245// This has to match CellType in trace_processor.proto. 246enum CellType { 247 CELL_NULL = 1, 248 CELL_VARINT = 2, 249 CELL_FLOAT64 = 3, 250 CELL_STRING = 4, 251 CELL_BLOB = 5, 252} 253 254const CELL_TYPE_NAMES = [ 255 'UNKNOWN', 256 'NULL', 257 'VARINT', 258 'FLOAT64', 259 'STRING', 260 'BLOB', 261]; 262 263const TAG_LEN_DELIM = 2; 264 265// This is the interface exposed to readers (e.g. tracks). The underlying object 266// (QueryResultImpl) owns the result data. This allows to obtain iterators on 267// that. In future it will allow to wait for incremental updates (new rows being 268// fetched) for streaming queries. 269export interface QueryResult { 270 // Obtains an iterator. 271 // TODO(primiano): this should have an option to destruct data as we read. In 272 // the case of a long query (e.g. `SELECT * FROM sched` in the query prompt) 273 // we don't want to accumulate everything in memory. OTOH UI tracks want to 274 // keep the data around so they can redraw them on each animation frame. For 275 // now we keep everything in memory in the QueryResultImpl object. 276 // iter<T extends Row>(spec: T): RowIterator<T>; 277 iter<T extends Row>(spec: T): RowIterator<T>; 278 279 // Like iter() for queries that expect only one row. It embeds the valid() 280 // check (i.e. throws if no rows are available) and returns directly the 281 // first result. 282 firstRow<T extends Row>(spec: T): T; 283 284 // Like firstRow() but returns undefined if no rows are available. 285 maybeFirstRow<T extends Row>(spec: T): T | undefined; 286 287 // If != undefined the query errored out and error() contains the message. 288 error(): string | undefined; 289 290 // Returns the number of rows accumulated so far. Note that this number can 291 // change over time as more batches are received. It becomes stable only 292 // when isComplete() returns true or after waitAllRows() is resolved. 293 numRows(): number; 294 295 // If true all rows have been fetched. Calling iter() will iterate through the 296 // last row. If false, iter() will return an iterator which might iterate 297 // through some rows (or none) but will surely not reach the end. 298 isComplete(): boolean; 299 300 // Returns a promise that is resolved only when all rows (i.e. all batches) 301 // have been fetched. The promise return value is always the object itself. 302 waitAllRows(): Promise<QueryResult>; 303 304 // Returns a promise that is resolved when either: 305 // - more rows are available 306 // - all rows are available 307 // The promise return value is always the object iself. 308 waitMoreRows(): Promise<QueryResult>; 309 310 // Can return an empty array if called before the first batch is resolved. 311 // This should be called only after having awaited for at least one batch. 312 columns(): string[]; 313 314 // Returns the number of SQL statements in the query 315 // (e.g. 2 'if SELECT 1; SELECT 2;') 316 statementCount(): number; 317 318 // Returns the number of SQL statement that produced output rows. This number 319 // is <= statementCount(). 320 statementWithOutputCount(): number; 321 322 // Returns the last SQL statement. 323 lastStatementSql(): string; 324} 325 326// Interface exposed to engine.ts to pump in the data as new row batches arrive. 327export interface WritableQueryResult extends QueryResult { 328 // |resBytes| is a proto-encoded trace_processor.QueryResult message. 329 // The overall flow looks as follows: 330 // - The user calls engine.query('select ...') and gets a QueryResult back. 331 // - The query call posts a message to the worker that runs the SQL engine ( 332 // or sends a HTTP request in case of the RPC+HTTP interface). 333 // - The returned QueryResult object is initially empty. 334 // - Over time, the sql engine will postMessage() back results in batches. 335 // - Each bach will end up calling this appendResultBatch() method. 336 // - If there is any pending promise (e.g. the caller called 337 // queryResult.waitAllRows()), this call will awake them (if this is the 338 // last batch). 339 appendResultBatch(resBytes: Uint8Array): void; 340} 341 342// The actual implementation, which bridges together the reader side and the 343// writer side (the one exposed to the Engine). This is the same object so that 344// when the engine pumps new row batches we can resolve pending promises that 345// readers (e.g. track code) are waiting for. 346class QueryResultImpl implements QueryResult, WritableQueryResult { 347 columnNames: string[] = []; 348 private _error?: string; 349 private _numRows = 0; 350 private _isComplete = false; 351 private _errorInfo: QueryErrorInfo; 352 private _statementCount = 0; 353 private _statementWithOutputCount = 0; 354 private _lastStatementSql = ''; 355 356 constructor(errorInfo: QueryErrorInfo) { 357 this._errorInfo = errorInfo; 358 } 359 360 // --- QueryResult implementation. 361 362 // TODO(primiano): for the moment new batches are appended but old batches 363 // are never removed. This won't work with abnormally large result sets, as 364 // it will stash all rows in memory. We could switch to a model where the 365 // iterator is destructive and deletes batch objects once iterating past the 366 // end of each batch. If we do that, than we need to assign monotonic IDs to 367 // batches. Also if we do that, we should prevent creating more than one 368 // iterator for a QueryResult. 369 batches: ResultBatch[] = []; 370 371 // Promise awaiting on waitAllRows(). This should be resolved only when the 372 // last result batch has been been retrieved. 373 private allRowsPromise?: Deferred<QueryResult>; 374 375 // Promise awaiting on waitMoreRows(). This resolved when the next 376 // batch is appended via appendResultBatch. 377 private moreRowsPromise?: Deferred<QueryResult>; 378 379 isComplete(): boolean { 380 return this._isComplete; 381 } 382 numRows(): number { 383 return this._numRows; 384 } 385 error(): string | undefined { 386 return this._error; 387 } 388 columns(): string[] { 389 return this.columnNames; 390 } 391 statementCount(): number { 392 return this._statementCount; 393 } 394 statementWithOutputCount(): number { 395 return this._statementWithOutputCount; 396 } 397 lastStatementSql(): string { 398 return this._lastStatementSql; 399 } 400 401 iter<T extends Row>(spec: T): RowIterator<T> { 402 const impl = new RowIteratorImplWithRowData(spec, this); 403 return impl as {} as RowIterator<T>; 404 } 405 406 firstRow<T extends Row>(spec: T): T { 407 const impl = new RowIteratorImplWithRowData(spec, this); 408 assertTrue(impl.valid()); 409 return impl as {} as RowIterator<T> as T; 410 } 411 412 maybeFirstRow<T extends Row>(spec: T): T | undefined { 413 const impl = new RowIteratorImplWithRowData(spec, this); 414 if (!impl.valid()) { 415 return undefined; 416 } 417 return impl as {} as RowIterator<T> as T; 418 } 419 420 // Can be called only once. 421 waitAllRows(): Promise<QueryResult> { 422 assertTrue(this.allRowsPromise === undefined); 423 this.allRowsPromise = defer<QueryResult>(); 424 if (this._isComplete) { 425 this.resolveOrReject(this.allRowsPromise, this); 426 } 427 return this.allRowsPromise; 428 } 429 430 waitMoreRows(): Promise<QueryResult> { 431 if (this.moreRowsPromise !== undefined) { 432 return this.moreRowsPromise; 433 } 434 435 const moreRowsPromise = defer<QueryResult>(); 436 if (this._isComplete) { 437 this.resolveOrReject(moreRowsPromise, this); 438 } else { 439 this.moreRowsPromise = moreRowsPromise; 440 } 441 return moreRowsPromise; 442 } 443 444 // --- WritableQueryResult implementation. 445 446 // Called by the engine when a new QueryResult is available. Note that a 447 // single Query() call can yield >1 QueryResult due to result batching 448 // if more than ~64K of data are returned, e.g. when returning O(M) rows. 449 // |resBytes| is a proto-encoded trace_processor.QueryResult message. 450 // It is fine to retain the resBytes without slicing a copy, because 451 // ProtoRingBuffer does the slice() for us (or passes through the buffer 452 // coming from postMessage() (Wasm case) of fetch() (HTTP+RPC case). 453 appendResultBatch(resBytes: Uint8Array) { 454 const reader = protobuf.Reader.create(resBytes); 455 assertTrue(reader.pos === 0); 456 const columnNamesEmptyAtStartOfBatch = this.columnNames.length === 0; 457 const columnNamesSet = new Set<string>(); 458 while (reader.pos < reader.len) { 459 const tag = reader.uint32(); 460 switch (tag >>> 3) { 461 case 1: // column_names 462 // Only the first batch should contain the column names. If this fires 463 // something is going wrong in the handling of the batch stream. 464 assertTrue(columnNamesEmptyAtStartOfBatch); 465 const origColName = reader.string(); 466 let colName = origColName; 467 // In some rare cases two columns can have the same name (b/194891824) 468 // e.g. `select 1 as x, 2 as x`. These queries don't happen in the 469 // UI code, but they can happen when the user types a query (e.g. 470 // with a join). The most practical thing we can do here is renaming 471 // the columns with a suffix. Keeping the same name will break when 472 // iterating, because column names become iterator object keys. 473 for (let i = 1; columnNamesSet.has(colName); ++i) { 474 colName = `${origColName}_${i}`; 475 assertTrue(i < 100); // Give up at some point; 476 } 477 columnNamesSet.add(colName); 478 this.columnNames.push(colName); 479 break; 480 case 2: // error 481 // The query has errored only if the |error| field is non-empty. 482 // In protos, we don't distinguish between non-present and empty. 483 // Make sure we don't propagate ambiguous empty strings to JS. 484 const err = reader.string(); 485 this._error = err !== undefined && err.length ? err : undefined; 486 break; 487 case 3: // batch 488 const batchLen = reader.uint32(); 489 const batchRaw = resBytes.subarray(reader.pos, reader.pos + batchLen); 490 reader.pos += batchLen; 491 492 // The ResultBatch ctor parses the CellsBatch submessage. 493 const parsedBatch = new ResultBatch(batchRaw); 494 this.batches.push(parsedBatch); 495 this._isComplete = parsedBatch.isLastBatch; 496 497 // In theory one could construct a valid proto serializing the column 498 // names after the cell batches. In practice the QueryResultSerializer 499 // doesn't do that so it's not worth complicating the code. 500 const numColumns = this.columnNames.length; 501 if (numColumns !== 0) { 502 assertTrue(parsedBatch.numCells % numColumns === 0); 503 this._numRows += parsedBatch.numCells / numColumns; 504 } else { 505 // numColumns == 0 is plausible for queries like CREATE TABLE ... . 506 assertTrue(parsedBatch.numCells === 0); 507 } 508 break; 509 510 case 4: 511 this._statementCount = reader.uint32(); 512 break; 513 514 case 5: 515 this._statementWithOutputCount = reader.uint32(); 516 break; 517 518 case 6: 519 this._lastStatementSql = reader.string(); 520 break; 521 522 default: 523 console.warn(`Unexpected QueryResult field ${tag >>> 3}`); 524 reader.skipType(tag & 7); 525 break; 526 } // switch (tag) 527 } // while (pos < end) 528 529 if (this.moreRowsPromise !== undefined) { 530 this.resolveOrReject(this.moreRowsPromise, this); 531 this.moreRowsPromise = undefined; 532 } 533 534 if (this._isComplete && this.allRowsPromise !== undefined) { 535 this.resolveOrReject(this.allRowsPromise, this); 536 } 537 } 538 539 ensureAllRowsPromise(): Promise<QueryResult> { 540 if (this.allRowsPromise === undefined) { 541 this.waitAllRows(); // Will populate |this.allRowsPromise|. 542 } 543 return assertExists(this.allRowsPromise); 544 } 545 546 get errorInfo(): QueryErrorInfo { 547 return this._errorInfo; 548 } 549 550 private resolveOrReject(promise: Deferred<QueryResult>, arg: QueryResult) { 551 if (this._error === undefined) { 552 promise.resolve(arg); 553 } else { 554 promise.reject(new QueryError(this._error, this._errorInfo)); 555 } 556 } 557} 558 559// This class holds onto a received result batch (a Uint8Array) and does some 560// partial parsing to tokenize the various cell groups. This parsing mainly 561// consists of identifying and caching the offsets of each cell group and 562// initializing the varint decoders. This half parsing is done to keep the 563// iterator's next() fast, without decoding everything into memory. 564// This is an internal implementation detail and is not exposed outside. The 565// RowIteratorImpl uses this class to iterate through batches (this class takes 566// care of iterating within a batch, RowIteratorImpl takes care of switching 567// batches when needed). 568// Note: at any point in time there can be more than one ResultIterator 569// referencing the same batch. The batch must be immutable. 570class ResultBatch { 571 readonly isLastBatch: boolean = false; 572 readonly batchBytes: Uint8Array; 573 readonly cellTypesOff: number = 0; 574 readonly cellTypesLen: number = 0; 575 readonly varintOff: number = 0; 576 readonly varintLen: number = 0; 577 readonly float64Cells = new Float64Array(); 578 readonly blobCells: Uint8Array[] = []; 579 readonly stringCells: string[] = []; 580 581 // batchBytes is a trace_processor.QueryResult.CellsBatch proto. 582 constructor(batchBytes: Uint8Array) { 583 this.batchBytes = batchBytes; 584 const reader = protobuf.Reader.create(batchBytes); 585 assertTrue(reader.pos === 0); 586 const end = reader.len; 587 588 // Here we deconstruct the proto by hand. The CellsBatch is carefully 589 // designed to allow a very fast parsing from the TS side. We pack all cells 590 // of the same types together, so we can do only one call (per batch) to 591 // TextDecoder.decode(), we can overlay a memory-aligned typedarray for 592 // float values and can quickly tell and type-check the cell types. 593 // One row = N cells (we know the number upfront from the outer message). 594 // Each bach contains always an integer multiple of N cells (i.e. rows are 595 // never fragmented across different batches). 596 while (reader.pos < end) { 597 const tag = reader.uint32(); 598 switch (tag >>> 3) { 599 case 1: // cell types, a packed array containing one CellType per cell. 600 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 601 this.cellTypesLen = reader.uint32(); 602 this.cellTypesOff = reader.pos; 603 reader.pos += this.cellTypesLen; 604 break; 605 606 case 2: // varint_cells, a packed varint buffer. 607 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 608 const packLen = reader.uint32(); 609 this.varintOff = reader.pos; 610 this.varintLen = packLen; 611 assertTrue(reader.buf === batchBytes); 612 assertTrue( 613 this.varintOff + this.varintLen <= 614 batchBytes.byteOffset + batchBytes.byteLength, 615 ); 616 reader.pos += packLen; 617 break; 618 619 case 3: // float64_cells, a 64-bit aligned packed fixed64 buffer. 620 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 621 const f64Len = reader.uint32(); 622 assertTrue(f64Len % 8 === 0); 623 // Float64Array's constructor is evil: the offset is in bytes but the 624 // length is in 8-byte words. 625 const f64Words = f64Len / 8; 626 const f64Off = batchBytes.byteOffset + reader.pos; 627 if (f64Off % 8 === 0) { 628 this.float64Cells = new Float64Array( 629 batchBytes.buffer, 630 f64Off, 631 f64Words, 632 ); 633 } else { 634 // When using the production code in trace_processor's rpc.cc, the 635 // float64 should be 8-bytes aligned. The slow-path case is only for 636 // tests. 637 const slice = batchBytes.buffer.slice(f64Off, f64Off + f64Len); 638 this.float64Cells = new Float64Array(slice); 639 } 640 reader.pos += f64Len; 641 break; 642 643 case 4: // blob_cells: one entry per blob. 644 assertTrue((tag & 7) === TAG_LEN_DELIM); 645 // protobufjs's bytes() under the hoods calls slice() and creates 646 // a copy. Fine here as blobs are rare and not a fastpath. 647 this.blobCells.push(new Uint8Array(reader.bytes())); 648 break; 649 650 case 5: // string_cells: all the string cells concatenated with \0s. 651 assertTrue((tag & 7) === TAG_LEN_DELIM); 652 const strLen = reader.uint32(); 653 assertTrue(reader.pos + strLen <= end); 654 const subArr = batchBytes.subarray(reader.pos, reader.pos + strLen); 655 assertTrue(subArr.length === strLen); 656 // The reason why we do this split rather than creating one string 657 // per entry is that utf8 decoding has some non-negligible cost. See 658 // go/postmessage-benchmark . 659 this.stringCells = utf8Decode(subArr).split('\0'); 660 reader.pos += strLen; 661 break; 662 663 case 6: // is_last_batch (boolean). 664 this.isLastBatch = !!reader.bool(); 665 break; 666 667 case 7: // padding for realignment, skip silently. 668 reader.skipType(tag & 7); 669 break; 670 671 default: 672 console.warn(`Unexpected QueryResult.CellsBatch field ${tag >>> 3}`); 673 reader.skipType(tag & 7); 674 break; 675 } // switch(tag) 676 } // while (pos < end) 677 } 678 679 get numCells() { 680 return this.cellTypesLen; 681 } 682} 683 684class RowIteratorImpl implements RowIteratorBase { 685 // The spec passed to the iter call containing the expected types, e.g.: 686 // {'colA': NUM, 'colB': NUM_NULL, 'colC': STRING}. 687 // This doesn't ever change. 688 readonly rowSpec: Row; 689 690 // The object that holds the current row. This points to the parent 691 // RowIteratorImplWithRowData instance that created this class. 692 rowData: Row; 693 694 // The QueryResult object we are reading data from. The engine will pump 695 // batches over time into this object. 696 private resultObj: QueryResultImpl; 697 698 // All the member variables in the group below point to the identically-named 699 // members in result.batch[batchIdx]. This is to avoid indirection layers in 700 // the next() hotpath, so we can do this.float64Cells vs 701 // this.resultObj.batch[this.batchIdx].float64Cells. 702 // These are re-set every time tryMoveToNextBatch() is called (and succeeds). 703 private batchIdx = -1; // The batch index within |result.batches[]|. 704 private batchBytes = new Uint8Array(); 705 private columnNames: string[] = []; 706 private numColumns = 0; 707 private cellTypesEnd = -1; // -1 so the 1st next() hits tryMoveToNextBatch(). 708 private float64Cells = new Float64Array(); 709 private varIntReader = protobuf.Reader.create(this.batchBytes); 710 private blobCells: Uint8Array[] = []; 711 private stringCells: string[] = []; 712 713 // These members instead are incremented as we read cells from next(). They 714 // are the mutable state of the iterator. 715 private nextCellTypeOff = 0; 716 private nextFloat64Cell = 0; 717 private nextStringCell = 0; 718 private nextBlobCell = 0; 719 private isValid = false; 720 721 constructor(querySpec: Row, rowData: Row, res: QueryResultImpl) { 722 Object.assign(this, querySpec); 723 this.rowData = rowData; 724 this.rowSpec = {...querySpec}; // ... -> Copy all the key/value pairs. 725 this.resultObj = res; 726 this.next(); 727 } 728 729 valid(): boolean { 730 return this.isValid; 731 } 732 733 private makeError(message: string): QueryError { 734 return new QueryError(message, this.resultObj.errorInfo); 735 } 736 737 get(columnName: string): ColumnType { 738 const res = this.rowData[columnName]; 739 if (res === undefined) { 740 throw this.makeError( 741 `Column '${columnName}' doesn't exist. ` + 742 `Actual columns: [${this.columnNames.join(',')}]`, 743 ); 744 } 745 return res; 746 } 747 748 // Moves the cursor next by one row and updates |isValid|. 749 // When this fails to move, two cases are possible: 750 // 1. We reached the end of the result set (this is the case if 751 // QueryResult.isComplete() == true when this fails). 752 // 2. We reached the end of the current batch, but more rows might come later 753 // (if QueryResult.isComplete() == false). 754 next() { 755 // At some point we might reach the end of the current batch, but the next 756 // batch might be available already. In this case we want next() to 757 // transparently move on to the next batch. 758 while (this.nextCellTypeOff + this.numColumns > this.cellTypesEnd) { 759 // If TraceProcessor is behaving well, we should never end up in a 760 // situation where we have leftover cells. TP is expected to serialize 761 // whole rows in each QueryResult batch and NOT truncate them midway. 762 // If this assert fires the TP RPC logic has a bug. 763 assertTrue( 764 this.nextCellTypeOff === this.cellTypesEnd || this.cellTypesEnd === -1, 765 ); 766 if (!this.tryMoveToNextBatch()) { 767 this.isValid = false; 768 return; 769 } 770 } 771 772 const rowData = this.rowData; 773 const numColumns = this.numColumns; 774 775 // Read the current row. 776 for (let i = 0; i < numColumns; i++) { 777 const cellType = this.batchBytes[this.nextCellTypeOff++]; 778 const colName = this.columnNames[i]; 779 const expType = this.rowSpec[colName]; 780 781 switch (cellType) { 782 case CellType.CELL_NULL: 783 rowData[colName] = null; 784 break; 785 786 case CellType.CELL_VARINT: 787 if (expType === NUM || expType === NUM_NULL) { 788 // This is very subtle. The return type of int64 can be either a 789 // number or a Long.js {high:number, low:number} if Long.js is 790 // installed. The default state seems different in node and browser. 791 // We force-disable Long.js support in the top of this source file. 792 const val = this.varIntReader.int64(); 793 rowData[colName] = val as {} as number; 794 } else { 795 // LONG, LONG_NULL, or unspecified - return as bigint 796 const value = decodeInt64Varint( 797 this.batchBytes, 798 this.varIntReader.pos, 799 ); 800 rowData[colName] = value; 801 this.varIntReader.skip(); // Skips a varint 802 } 803 break; 804 805 case CellType.CELL_FLOAT64: 806 rowData[colName] = this.float64Cells[this.nextFloat64Cell++]; 807 break; 808 809 case CellType.CELL_STRING: 810 rowData[colName] = this.stringCells[this.nextStringCell++]; 811 break; 812 813 case CellType.CELL_BLOB: 814 const blob = this.blobCells[this.nextBlobCell++]; 815 rowData[colName] = blob; 816 break; 817 818 default: 819 throw this.makeError(`Invalid cell type ${cellType}`); 820 } 821 } // For (cells) 822 this.isValid = true; 823 } 824 825 private tryMoveToNextBatch(): boolean { 826 const nextBatchIdx = this.batchIdx + 1; 827 if (nextBatchIdx >= this.resultObj.batches.length) { 828 return false; 829 } 830 831 this.columnNames = this.resultObj.columnNames; 832 this.numColumns = this.columnNames.length; 833 834 this.batchIdx = nextBatchIdx; 835 const batch = assertExists(this.resultObj.batches[nextBatchIdx]); 836 this.batchBytes = batch.batchBytes; 837 this.nextCellTypeOff = batch.cellTypesOff; 838 this.cellTypesEnd = batch.cellTypesOff + batch.cellTypesLen; 839 this.float64Cells = batch.float64Cells; 840 this.blobCells = batch.blobCells; 841 this.stringCells = batch.stringCells; 842 this.varIntReader = protobuf.Reader.create(batch.batchBytes); 843 this.varIntReader.pos = batch.varintOff; 844 this.varIntReader.len = batch.varintOff + batch.varintLen; 845 this.nextFloat64Cell = 0; 846 this.nextStringCell = 0; 847 this.nextBlobCell = 0; 848 849 // Check that all the expected columns are present. 850 for (const expectedCol of Object.keys(this.rowSpec)) { 851 if (this.columnNames.indexOf(expectedCol) < 0) { 852 throw this.makeError( 853 `Column ${expectedCol} not found in the SQL result ` + 854 `set {${this.columnNames.join(' ')}}`, 855 ); 856 } 857 } 858 859 // Check that the cells types are consistent. 860 const numColumns = this.numColumns; 861 if (batch.numCells === 0) { 862 // This can happen if the query result contains just an error. In this 863 // an empty batch with isLastBatch=true is appended as an EOF marker. 864 // In theory TraceProcessor could return an empty batch in the middle and 865 // that would be fine from a protocol viewpoint. In practice, no code path 866 // does that today so it doesn't make sense trying supporting it with a 867 // recursive call to tryMoveToNextBatch(). 868 assertTrue(batch.isLastBatch); 869 return false; 870 } 871 872 assertTrue(numColumns > 0); 873 for (let i = this.nextCellTypeOff; i < this.cellTypesEnd; i++) { 874 const col = (i - this.nextCellTypeOff) % numColumns; 875 const colName = this.columnNames[col]; 876 const actualType = this.batchBytes[i] as CellType; 877 const expType = this.rowSpec[colName]; 878 879 // If undefined, the caller doesn't want to read this column at all, so 880 // it can be whatever. 881 if (expType === undefined) continue; 882 883 let err = ''; 884 if (!isCompatible(actualType, expType)) { 885 if (actualType === CellType.CELL_NULL) { 886 err = 887 'SQL value is NULL but that was not expected' + 888 ` (expected type: ${columnTypeToString(expType)}). ` + 889 'Did you mean NUM_NULL, LONG_NULL, STR_NULL or BLOB_NULL?'; 890 } else { 891 err = `Incompatible cell type. Expected: ${columnTypeToString( 892 expType, 893 )} actual: ${CELL_TYPE_NAMES[actualType]}`; 894 } 895 } 896 if (err.length > 0) { 897 const row = Math.floor(i / numColumns); 898 const message = `Error @ row: ${row} col: '${colName}': ${err}`; 899 throw this.makeError(message); 900 } 901 } 902 return true; 903 } 904} 905 906// This is the object ultimately returned to the client when calling 907// QueryResult.iter(...). 908// The only reason why this is disjoint from RowIteratorImpl is to avoid 909// naming collisions between the members variables required by RowIteratorImpl 910// and the column names returned by the iterator. 911class RowIteratorImplWithRowData implements RowIteratorBase { 912 private _impl: RowIteratorImpl; 913 914 next: () => void; 915 valid: () => boolean; 916 get: (columnName: string) => ColumnType; 917 918 constructor(querySpec: Row, res: QueryResultImpl) { 919 const thisAsRow = this as {} as Row; 920 Object.assign(thisAsRow, querySpec); 921 this._impl = new RowIteratorImpl(querySpec, thisAsRow, res); 922 this.next = this._impl.next.bind(this._impl); 923 this.valid = this._impl.valid.bind(this._impl); 924 this.get = this._impl.get.bind(this._impl); 925 } 926} 927 928// This is a proxy object that wraps QueryResultImpl, adding await-ability. 929// This is so that: 930// 1. Clients that just want to await for the full result set can just call 931// await engine.query('...') and will get a QueryResult that is guaranteed 932// to be complete. 933// 2. Clients that know how to handle the streaming can use it straight away. 934class WaitableQueryResultImpl 935 implements QueryResult, WritableQueryResult, PromiseLike<QueryResult> 936{ 937 private impl: QueryResultImpl; 938 private thenCalled = false; 939 940 constructor(errorInfo: QueryErrorInfo) { 941 this.impl = new QueryResultImpl(errorInfo); 942 } 943 944 // QueryResult implementation. Proxies all calls to the impl object. 945 iter<T extends Row>(spec: T) { 946 return this.impl.iter(spec); 947 } 948 firstRow<T extends Row>(spec: T) { 949 return this.impl.firstRow(spec); 950 } 951 maybeFirstRow<T extends Row>(spec: T) { 952 return this.impl.maybeFirstRow(spec); 953 } 954 waitAllRows() { 955 return this.impl.waitAllRows(); 956 } 957 waitMoreRows() { 958 return this.impl.waitMoreRows(); 959 } 960 isComplete() { 961 return this.impl.isComplete(); 962 } 963 numRows() { 964 return this.impl.numRows(); 965 } 966 columns() { 967 return this.impl.columns(); 968 } 969 error() { 970 return this.impl.error(); 971 } 972 statementCount() { 973 return this.impl.statementCount(); 974 } 975 statementWithOutputCount() { 976 return this.impl.statementWithOutputCount(); 977 } 978 lastStatementSql() { 979 return this.impl.lastStatementSql(); 980 } 981 982 // WritableQueryResult implementation. 983 appendResultBatch(resBytes: Uint8Array) { 984 return this.impl.appendResultBatch(resBytes); 985 } 986 987 // PromiseLike<QueryResult> implementaton. 988 989 // eslint-disable-next-line @typescript-eslint/no-explicit-any 990 then(onfulfilled: any, onrejected: any): any { 991 assertFalse(this.thenCalled); 992 this.thenCalled = true; 993 return this.impl.ensureAllRowsPromise().then(onfulfilled, onrejected); 994 } 995 996 // eslint-disable-next-line @typescript-eslint/no-explicit-any 997 catch(error: any): any { 998 return this.impl.ensureAllRowsPromise().catch(error); 999 } 1000 1001 // eslint-disable-next-line @typescript-eslint/no-explicit-any 1002 finally(callback: () => void): any { 1003 return this.impl.ensureAllRowsPromise().finally(callback); 1004 } 1005 1006 // eslint and clang-format disagree on how to format get[foo](). Let 1007 // clang-format win: 1008 get [Symbol.toStringTag](): string { 1009 return 'Promise<WaitableQueryResult>'; 1010 } 1011} 1012 1013export function createQueryResult( 1014 errorInfo: QueryErrorInfo, 1015): QueryResult & Promise<QueryResult> & WritableQueryResult { 1016 return new WaitableQueryResultImpl(errorInfo); 1017} 1018 1019// Throws if the value cannot be reasonably converted to a bigint. 1020// Assumes value is in native time units. 1021export function timeFromSql(value: ColumnType): time { 1022 if (typeof value === 'bigint') { 1023 return Time.fromRaw(value); 1024 } else if (typeof value === 'number') { 1025 return Time.fromRaw(BigInt(Math.floor(value))); 1026 } else if (value === null) { 1027 return Time.ZERO; 1028 } else { 1029 throw Error(`Refusing to create time from unrelated type ${value}`); 1030 } 1031} 1032 1033// Throws if the value cannot be reasonably converted to a bigint. 1034// Assumes value is in nanoseconds. 1035export function durationFromSql(value: ColumnType): duration { 1036 if (typeof value === 'bigint') { 1037 return value; 1038 } else if (typeof value === 'number') { 1039 return BigInt(Math.floor(value)); 1040 } else if (value === null) { 1041 return Duration.ZERO; 1042 } else { 1043 throw Error(`Refusing to create duration from unrelated type ${value}`); 1044 } 1045} 1046