xref: /aosp_15_r20/external/cronet/testing/unexpected_passes_common/queries.py (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1# Copyright 2020 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Methods related to querying the ResultDB BigQuery tables."""
5
6import concurrent.futures
7import json
8import logging
9import math
10import multiprocessing.pool
11import os
12import subprocess
13import threading
14import time
15from typing import Any, Collection, Dict, Iterable, List, Optional, Tuple, Union
16
17import six
18
19from typ import expectations_parser
20from typ import json_results
21from unexpected_passes_common import builders as builders_module
22from unexpected_passes_common import constants
23from unexpected_passes_common import data_types
24from unexpected_passes_common import expectations
25
26DEFAULT_NUM_SAMPLES = 100
27MAX_ROWS = (2**31) - 1
28MAX_QUERY_TRIES = 3
29# Used to prevent us from triggering too many queries simultaneously and causing
30# a bunch of rate limit errors. Anything below 1.5 seemed to result in enough
31# rate limit errors to cause problems. Raising above that for safety.
32QUERY_DELAY = 2
33# The target number of results/rows per query when running in large query mode.
34# Higher values = longer individual query times and higher chances of running
35# out of memory in BigQuery. Lower values = more parallelization overhead and
36# more issues with rate limit errors.
37TARGET_RESULTS_PER_QUERY = 20000
38
39# Subquery for getting all try builds that were used for CL submission. 30 days
40# is chosen because the ResultDB tables we pull data from only keep data around
41# for 30 days.
42SUBMITTED_BUILDS_TEMPLATE = """\
43    SELECT
44      CONCAT("build-", CAST(unnested_builds.id AS STRING)) as id
45    FROM
46      `commit-queue.{project_view}.attempts`,
47      UNNEST(builds) as unnested_builds,
48      UNNEST(gerrit_changes) as unnested_changes
49    WHERE
50      unnested_builds.host = "cr-buildbucket.appspot.com"
51      AND unnested_changes.submit_status = "SUCCESS"
52      AND start_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(),
53                                     INTERVAL 30 DAY)"""
54
55QueryResult = Dict[str, Any]
56QueryParameters = Dict[str, Dict[str, Any]]
57
58# pylint: disable=super-with-arguments,useless-object-inheritance
59
60
61class BigQueryQuerier(object):
62  """Class to handle all BigQuery queries for a script invocation."""
63
64  def __init__(self,
65               suite: Optional[str],
66               project: str,
67               num_samples: int,
68               large_query_mode: bool,
69               num_jobs: Optional[int],
70               use_batching: bool = True):
71    """
72    Args:
73      suite: A string containing the name of the suite that is being queried
74          for. Can be None if there is no differentiation between different
75          suites.
76      project: A string containing the billing project to use for BigQuery.
77      num_samples: An integer containing the number of builds to pull results
78          from.
79      large_query_mode: A boolean indicating whether large query mode should be
80          used. In this mode, an initial, smaller query is made and its results
81          are used to perform additional filtering on a second, larger query in
82          BigQuery. This works around hitting a hard memory limit when running
83          the ORDER BY clause.
84      num_jobs: An integer specifying how many jobs to run in parallel. If None,
85          all jobs will be run in parallel at the same time.
86      use_batching: Whether to use batching when running queries. Batching
87          allows a much greater amount of parallelism due to avoiding usage
88          limits, but also adds a variable amount of overhead since there need
89          to be free resources.
90    """
91    self._suite = suite
92    self._project = project
93    self._num_samples = num_samples or DEFAULT_NUM_SAMPLES
94    self._large_query_mode = large_query_mode
95    self._num_jobs = num_jobs
96    self._use_batching = use_batching
97
98    assert self._num_samples > 0
99    assert (self._num_jobs is None or self._num_jobs > 0)
100
101  def FillExpectationMapForBuilders(
102      self, expectation_map: data_types.TestExpectationMap,
103      builders: Collection[data_types.BuilderEntry]
104  ) -> Dict[str, data_types.ResultListType]:
105    """Fills |expectation_map| with results from |builders|.
106
107    Args:
108      expectation_map: A data_types.TestExpectationMap. Will be modified
109          in-place.
110      builders: An iterable of data_types.BuilderEntry containing the builders
111          to query.
112
113    Returns:
114      A dict containing any results that were retrieved that did not have a
115      matching expectation in |expectation_map| in the following format:
116      {
117        |builder_type|:|builder_name| (str): [
118          result1 (data_types.Result),
119          result2 (data_types.Result),
120          ...
121        ],
122      }
123    """
124    start_time = time.time()
125    logging.debug('Starting to fill expectation map for %d builders',
126                  len(builders))
127    assert isinstance(expectation_map, data_types.TestExpectationMap)
128    # Ensure that all the builders are of the same type since we make some
129    # assumptions about that later on.
130    assert builders
131    builder_type = None
132    for b in builders:
133      if builder_type is None:
134        builder_type = b.builder_type
135      else:
136        assert b.builder_type == builder_type
137
138    # Filter out any builders that we can easily determine do not currently
139    # produce data we care about.
140    builders = self._FilterOutInactiveBuilders(builders, builder_type)
141
142    # If we don't have an explicit number of jobs set, spin up a separate
143    # process for each query/add step. This is wasteful in the sense that we'll
144    # have a bunch of idle processes once faster steps start finishing, but
145    # ensures that we start slow queries early and avoids the overhead of
146    # passing large amounts of data between processes. See crbug.com/1182459 for
147    # more information on performance considerations.
148    num_jobs = self._num_jobs or len(builders)
149    args = [(b, expectation_map) for b in builders]
150
151    tmp_expectation_map = data_types.TestExpectationMap()
152    all_unmatched_results = {}
153
154    with concurrent.futures.ThreadPoolExecutor(max_workers=num_jobs) as pool:
155      for result in pool.map(self._QueryAddCombined, args):
156        unmatched_results, prefixed_builder_name, merge_map = result
157        tmp_expectation_map.Merge(merge_map, expectation_map)
158        if unmatched_results:
159          all_unmatched_results[prefixed_builder_name] = unmatched_results
160
161    expectation_map.clear()
162    expectation_map.update(tmp_expectation_map)
163
164    logging.debug('Filling expectation map took %f', time.time() - start_time)
165    return all_unmatched_results
166
167  def _FilterOutInactiveBuilders(self,
168                                 builders: Iterable[data_types.BuilderEntry],
169                                 builder_type: str
170                                 ) -> List[data_types.BuilderEntry]:
171    """Filters out any builders that are not producing data.
172
173    This helps save time on querying, as querying for the builder names is cheap
174    while querying for individual results from a builder is expensive. Filtering
175    out inactive builders lets us preemptively remove builders that we know we
176    won't get any data from, and thus don't need to waste time querying.
177
178    Args:
179      builders: An iterable of data_types.BuilderEntry containing the builders
180          to query.
181      builder_type: A string containing the type of builder to query, either
182          "ci" or "try".
183
184    Returns:
185      A copy of |builders| with any inactive builders removed.
186    """
187    include_internal_builders = any(b.is_internal_builder for b in builders)
188    query = self._GetActiveBuilderQuery(
189        builder_type, include_internal_builders).encode('utf-8')
190    cmd = GenerateBigQueryCommand(self._project, {}, batch=False)
191    with open(os.devnull, 'w', newline='', encoding='utf-8') as devnull:
192      p = subprocess.Popen(cmd,
193                           stdout=subprocess.PIPE,
194                           stderr=devnull,
195                           stdin=subprocess.PIPE)
196    stdout, _ = p.communicate(query)
197    if not isinstance(stdout, six.string_types):
198      stdout = stdout.decode('utf-8')
199    results = json.loads(stdout)
200
201    # We filter from an initial list instead of directly using the returned
202    # builders since there are cases where they aren't equivalent, such as for
203    # GPU tests if a particular builder doesn't run a particular suite. This
204    # could be encapsulated in the query, but this would cause the query to take
205    # longer. Since generating the initial list locally is basically
206    # instantenous and we're optimizing for runtime, filtering is the better
207    # option.
208    active_builders = {r['builder_name'] for r in results}
209    filtered_builders = [b for b in builders if b.name in active_builders]
210    return filtered_builders
211
212  def _QueryAddCombined(
213      self,
214      inputs: Tuple[data_types.BuilderEntry, data_types.TestExpectationMap]
215  ) -> Tuple[data_types.ResultListType, str, data_types.TestExpectationMap]:
216    """Combines the query and add steps for use in a process pool.
217
218    Args:
219      inputs: An iterable of inputs for QueryBuilder() and
220          data_types.TestExpectationMap.AddResultList(). Should be in the order:
221          builder expectation_map
222
223    Returns:
224      The output of data_types.TestExpectationMap.AddResultList().
225    """
226    start_time = time.time()
227    builder, expectation_map = inputs
228    logging.debug('Starting query for builder %s', builder.name)
229    results, expectation_files = self.QueryBuilder(builder)
230    logging.debug('Query for builder %s took %f', builder.name,
231                  time.time() - start_time)
232
233    start_time = time.time()
234    prefixed_builder_name = '%s/%s:%s' % (builder.project, builder.builder_type,
235                                          builder.name)
236    logging.debug('Starting data processing for builder %s', builder.name)
237    unmatched_results = expectation_map.AddResultList(prefixed_builder_name,
238                                                      results,
239                                                      expectation_files)
240    logging.debug('Data processing for builder %s took %f', builder.name,
241                  time.time() - start_time)
242
243    return unmatched_results, prefixed_builder_name, expectation_map
244
245  def QueryBuilder(self, builder: data_types.BuilderEntry
246                   ) -> Tuple[data_types.ResultListType, Optional[List[str]]]:
247    """Queries ResultDB for results from |builder|.
248
249    Args:
250      builder: A data_types.BuilderEntry containing the builder to query.
251
252    Returns:
253      A tuple (results, expectation_files). |results| is the results returned by
254      the query converted into a list of data_types.Result objects.
255      |expectation_files| is a set of strings denoting which expectation files
256      are relevant to |results|, or None if all should be used.
257    """
258
259    query_generator = self._GetQueryGeneratorForBuilder(builder)
260    if not query_generator:
261      # No affected tests on this builder, so early return.
262      return [], None
263
264    # Query for the test data from the builder, splitting the query if we run
265    # into the BigQuery hard memory limit. Even if we keep failing, this will
266    # eventually stop due to getting a QuerySplitError when we can't split the
267    # query any further.
268    query_results = None
269    while query_results is None:
270      try:
271        query_results = self._RunBigQueryCommandsForJsonOutput(
272            query_generator.GetQueries(), {
273                '': {
274                    'builder_name': builder.name
275                },
276                'INT64': {
277                    'num_builds': self._num_samples
278                }
279            })
280      except MemoryLimitError:
281        logging.warning(
282            'Query to builder %s hit BigQuery hard memory limit, trying again '
283            'with more query splitting.', builder.name)
284        query_generator.SplitQuery()
285
286    results = []
287    if not query_results:
288      # Don't bother logging if we know this is a fake CI builder.
289      if not (builder.builder_type == constants.BuilderTypes.CI
290              and builder in builders_module.GetInstance().GetFakeCiBuilders()):
291        logging.warning(
292            'Did not get results for "%s", but this may be because its '
293            'results do not apply to any expectations for this suite.',
294            builder.name)
295      return results, None
296
297    # It's possible that a builder runs multiple versions of a test with
298    # different expectation files for each version. So, find a result for each
299    # unique step and get the expectation files from all of them.
300    results_for_each_step = {}
301    for qr in query_results:
302      step_name = qr['step_name']
303      if step_name not in results_for_each_step:
304        results_for_each_step[step_name] = qr
305
306    expectation_files = set()
307    for qr in results_for_each_step.values():
308      # None is a special value indicating "use all expectation files", so
309      # handle that.
310      ef = self._GetRelevantExpectationFilesForQueryResult(qr)
311      if ef is None:
312        expectation_files = None
313        break
314      expectation_files |= set(ef)
315    if expectation_files is not None:
316      expectation_files = list(expectation_files)
317
318    # The query result list is potentially very large, so reduce the list as we
319    # iterate over it instead of using a standard for/in so that we don't
320    # temporarily end up with a ~2x increase in memory.
321    while query_results:
322      r = query_results.pop()
323      if self._ShouldSkipOverResult(r):
324        continue
325      results.append(self._ConvertJsonResultToResultObject(r))
326    logging.debug('Got %d results for %s builder %s', len(results),
327                  builder.builder_type, builder.name)
328    return results, expectation_files
329
330  def _ConvertJsonResultToResultObject(self, json_result: QueryResult
331                                       ) -> data_types.Result:
332    """Converts a single BigQuery JSON result to a data_types.Result.
333
334    Args:
335      json_result: A single row/result from BigQuery in JSON format.
336
337    Returns:
338      A data_types.Result object containing the information from |json_result|.
339    """
340    build_id = _StripPrefixFromBuildId(json_result['id'])
341    test_name = self._StripPrefixFromTestId(json_result['test_id'])
342    actual_result = _ConvertActualResultToExpectationFileFormat(
343        json_result['status'])
344    tags = expectations.GetInstance().FilterToKnownTags(json_result['typ_tags'])
345    step = json_result['step_name']
346    return data_types.Result(test_name, tags, actual_result, step, build_id)
347
348  def _GetRelevantExpectationFilesForQueryResult(self, query_result: QueryResult
349                                                 ) -> Optional[Iterable[str]]:
350    """Gets the relevant expectation file names for a given query result.
351
352    Args:
353      query_result: A dict containing single row/result from a BigQuery query.
354
355    Returns:
356      An iterable of strings containing expectation file names that are
357      relevant to |query_result|, or None if all expectation files should be
358      considered relevant.
359    """
360    raise NotImplementedError()
361
362  def _ShouldSkipOverResult(self, result: QueryResult) -> bool:
363    """Whether |result| should be ignored and skipped over.
364
365    Args:
366      result: A dict containing a single BigQuery result row.
367
368    Returns:
369      True if the result should be skipped over/ignored, otherwise False.
370    """
371    del result
372    return False
373
374  def _GetQueryGeneratorForBuilder(self, builder: data_types.BuilderEntry
375                                   ) -> Optional['BaseQueryGenerator']:
376    """Returns a BaseQueryGenerator instance to only include relevant tests.
377
378    Args:
379      builder: A data_types.BuilderEntry containing the builder to query.
380
381    Returns:
382      None if the query returned no results. Otherwise, some instance of a
383      BaseQueryGenerator.
384    """
385    raise NotImplementedError()
386
387  def _RunBigQueryCommandsForJsonOutput(self, queries: Union[str, List[str]],
388                                        parameters: QueryParameters
389                                        ) -> List[QueryResult]:
390    """Runs the given BigQuery queries and returns their outputs as JSON.
391
392    Args:
393      queries: A string or list of strings containing valid BigQuery queries to
394          run or a single string containing a query.
395      parameters: A dict specifying parameters to substitute in the query in
396          the format {type: {key: value}}. For example, the dict:
397          {'INT64': {'num_builds': 5}}
398          would result in --parameter=num_builds:INT64:5 being passed to
399          BigQuery.
400
401    Returns:
402      The combined results of |queries| in JSON.
403    """
404    if isinstance(queries, str):
405      queries = [queries]
406    assert isinstance(queries, list)
407
408    processes = set()
409    processes_lock = threading.Lock()
410
411    def run_cmd_in_thread(inputs: Tuple[List[str], str]) -> str:
412      cmd, query = inputs
413      query = query.encode('utf-8')
414      with open(os.devnull, 'w', newline='', encoding='utf-8') as devnull:
415        with processes_lock:
416          # Starting many queries at once causes us to hit rate limits much more
417          # frequently, so stagger query starts to help avoid that.
418          time.sleep(QUERY_DELAY)
419          p = subprocess.Popen(cmd,
420                               stdout=subprocess.PIPE,
421                               stderr=devnull,
422                               stdin=subprocess.PIPE)
423          processes.add(p)
424
425        # We pass in the query via stdin instead of including it on the
426        # commandline because we can run into command length issues in large
427        # query mode.
428        stdout, _ = p.communicate(query)
429        if not isinstance(stdout, six.string_types):
430          stdout = stdout.decode('utf-8')
431        if p.returncode:
432          # When running many queries in parallel, it's possible to hit the
433          # rate limit for the account if we're unlucky, so try again if we do.
434          if 'Exceeded rate limits' in stdout:
435            raise RateLimitError()
436          error_msg = 'Error running command %s. stdout: %s' % (cmd, stdout)
437          if 'memory' in stdout:
438            raise MemoryLimitError(error_msg)
439          raise RuntimeError(error_msg)
440        return stdout
441
442    def run_cmd(cmd: List[str], tries: int) -> List[str]:
443      if tries >= MAX_QUERY_TRIES:
444        raise RuntimeError('Query failed too many times, aborting')
445
446      # We use a thread pool with a thread for each query/process instead of
447      # just creating the processes due to guidance from the Python docs:
448      # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.stderr
449      # We need to write to stdin to pass the query in, but using
450      # stdout/stderr/stdin directly is discouraged due to the potential for
451      # deadlocks. The suggested method (using .communicate()) blocks, so we
452      # need the thread pool to maintain parallelism.
453      pool = multiprocessing.pool.ThreadPool(len(queries))
454
455      def cleanup():
456        pool.terminate()
457        for p in processes:
458          try:
459            p.terminate()
460          except OSError:
461            # We can fail to terminate if the process is already finished, so
462            # ignore such failures.
463            pass
464        processes.clear()
465
466      args = [(cmd, q) for q in queries]
467      try:
468        return pool.map(run_cmd_in_thread, args)
469      except RateLimitError:
470        logging.warning('Query hit rate limit, retrying')
471        cleanup()
472        return run_cmd(cmd, tries + 1)
473      finally:
474        cleanup()
475      raise RuntimeError('Hit branch that should  be unreachable')
476
477    bq_cmd = GenerateBigQueryCommand(self._project,
478                                     parameters,
479                                     batch=self._use_batching)
480    stdouts = run_cmd(bq_cmd, 0)
481    combined_json = []
482    for result in [json.loads(s) for s in stdouts]:
483      for row in result:
484        combined_json.append(row)
485    return combined_json
486
487  def _StripPrefixFromTestId(self, test_id: str) -> str:
488    """Strips the prefix from a test ID, leaving only the test case name.
489
490    Args:
491      test_id: A string containing a full ResultDB test ID, e.g.
492          ninja://target/directory.suite.class.test_case
493
494    Returns:
495      A string containing the test cases name extracted from |test_id|.
496    """
497    raise NotImplementedError()
498
499  def _GetActiveBuilderQuery(self, builder_type: str,
500                             include_internal_builders: bool) -> str:
501    """Gets the SQL query for determining which builders actually produce data.
502
503    Args:
504      builder_type: A string containing the type of builders to query, either
505          "ci" or "try".
506      include_internal_builders: A boolean indicating whether internal builders
507          should be included in the data that the query will access.
508
509    Returns:
510      A string containing a SQL query that will get all the names of all
511      relevant builders that are active/producing data.
512    """
513    raise NotImplementedError()
514
515
516class BaseQueryGenerator(object):
517  """Abstract base class for query generators."""
518
519  def __init__(self, builder: data_types.BuilderEntry):
520    self._builder = builder
521
522  def SplitQuery(self) -> None:
523    """Splits the query into more clauses/queries."""
524    raise NotImplementedError('SplitQuery must be overridden in a child class')
525
526  def GetClauses(self) -> List[str]:
527    """Gets string representations of the test filters.
528
529    Returns:
530      A list of strings, each string being a valid SQL clause that applies a
531      portion of the test filter to a query.
532    """
533    raise NotImplementedError('GetClauses must be overridden in a child class')
534
535  def GetQueries(self) -> List[str]:
536    """Gets string representations of the queries to run.
537
538    Returns:
539      A list of strings, each string being a valid SQL query that queries a
540      portion of the tests of interest.
541    """
542    raise NotImplementedError('GetQueries must be overridden in a child class')
543
544
545# pylint: disable=abstract-method
546class FixedQueryGenerator(BaseQueryGenerator):
547  """Concrete test filter that cannot be split."""
548
549  def __init__(self, builder: data_types.BuilderEntry, test_filter: str):
550    """
551    Args:
552      test_filter: A string containing the test filter SQL clause to use.
553    """
554    super(FixedQueryGenerator, self).__init__(builder)
555    self._test_filter = test_filter
556
557  def SplitQuery(self) -> None:
558    raise QuerySplitError('Tried to split a query without any test IDs to use, '
559                          'use --large-query-mode')
560
561  def GetClauses(self) -> List[str]:
562    return [self._test_filter]
563# pylint: enable=abstract-method
564
565
566# pylint: disable=abstract-method
567class SplitQueryGenerator(BaseQueryGenerator):
568  """Concrete test filter that can be split to a desired size."""
569
570  def __init__(self, builder: data_types.BuilderEntry, test_ids: List[str],
571               target_num_samples: int):
572    """
573    Args:
574      test_ids: A list of strings containing the test IDs to use in the test
575          test filter.
576      target_num_samples: The target/max number of samples to get from each
577          query that uses clauses from this test filter.
578    """
579    super(SplitQueryGenerator, self).__init__(builder)
580    self._test_id_lists = []
581    self._target_num_samples = target_num_samples
582    self._clauses = []
583    self._PerformInitialSplit(test_ids)
584
585  def _PerformInitialSplit(self, test_ids: List[str]) -> None:
586    """Evenly splits |test_ids| into lists that are  ~|_target_num_samples| long
587
588    Only to be called from the constructor.
589
590    Args:
591      test_ids: A list of test IDs to split and assign to the _test_id_lists
592          member.
593    """
594    assert isinstance(test_ids[0], six.string_types)
595
596    num_lists = int(math.ceil(float(len(test_ids)) / self._target_num_samples))
597    list_size = int(math.ceil(float(len(test_ids)) / num_lists))
598
599    split_lists = []
600    start = 0
601    for _ in range(num_lists):
602      end = min(len(test_ids), start + list_size)
603      split_lists.append(test_ids[start:end])
604      start = end
605    self._test_id_lists = split_lists
606    self._GenerateClauses()
607
608  def _GenerateClauses(self) -> None:
609    test_filter_clauses = []
610    for id_list in self._test_id_lists:
611      clause = 'AND test_id IN UNNEST([%s])' % ', '.join(id_list)
612      test_filter_clauses.append(clause)
613    self._clauses = test_filter_clauses
614
615  def SplitQuery(self) -> None:
616    def _SplitListInHalf(l: list) -> Tuple[list, list]:
617      assert len(l) > 1
618      front = l[:len(l) // 2]
619      back = l[len(l) // 2:]
620      return front, back
621
622    tmp_test_id_lists = []
623    for til in self._test_id_lists:
624      if len(til) <= 1:
625        raise QuerySplitError(
626            'Cannot split query any further, try lowering --num-samples')
627      front, back = _SplitListInHalf(til)
628      tmp_test_id_lists.append(front)
629      tmp_test_id_lists.append(back)
630    self._test_id_lists = tmp_test_id_lists
631    self._GenerateClauses()
632
633  def GetClauses(self) -> List[str]:
634    return self._clauses
635# pylint: enable=abstract-method
636
637
638def GenerateBigQueryCommand(project: str,
639                            parameters: QueryParameters,
640                            batch: bool = True) -> List[str]:
641  """Generate a BigQuery commandline.
642
643  Does not contain the actual query, as that is passed in via stdin.
644
645  Args:
646    project: A string containing the billing project to use for BigQuery.
647    parameters: A dict specifying parameters to substitute in the query in
648        the format {type: {key: value}}. For example, the dict:
649        {'INT64': {'num_builds': 5}}
650        would result in --parameter=num_builds:INT64:5 being passed to BigQuery.
651    batch: Whether to run the query in batch mode or not. Batching adds some
652        random amount of overhead since it means the query has to wait for idle
653        resources, but also allows for much better parallelism.
654
655  Returns:
656    A list containing the BigQuery commandline, suitable to be passed to a
657    method from the subprocess module.
658  """
659  cmd = [
660      'bq',
661      'query',
662      '--max_rows=%d' % MAX_ROWS,
663      '--format=json',
664      '--project_id=%s' % project,
665      '--use_legacy_sql=false',
666  ]
667
668  if batch:
669    cmd.append('--batch')
670
671  for parameter_type, parameter_pairs in parameters.items():
672    for k, v in parameter_pairs.items():
673      cmd.append('--parameter=%s:%s:%s' % (k, parameter_type, v))
674  return cmd
675
676
677def _StripPrefixFromBuildId(build_id: str) -> str:
678  # Build IDs provided by ResultDB are prefixed with "build-"
679  split_id = build_id.split('-')
680  assert len(split_id) == 2
681  return split_id[-1]
682
683
684def _ConvertActualResultToExpectationFileFormat(actual_result: str) -> str:
685  # Web tests use ResultDB's ABORT value for both test timeouts and device
686  # failures, but Abort is not defined in typ. So, map it to timeout now.
687  if actual_result == 'ABORT':
688    actual_result = json_results.ResultType.Timeout
689  # The result reported to ResultDB is in the format PASS/FAIL, while the
690  # expected results in an expectation file are in the format Pass/Failure.
691  return expectations_parser.RESULT_TAGS[actual_result]
692
693
694class RateLimitError(Exception):
695  """Exception raised when BigQuery hits a rate limit error."""
696
697
698class MemoryLimitError(Exception):
699  """Exception raised when BigQuery hits its hard memory limit."""
700
701
702class QuerySplitError(Exception):
703  """Exception raised when a query cannot be split any further."""
704