xref: /aosp_15_r20/tools/asuite/atest/module_info.py (revision c2e18aaa1096c836b086f94603d04f4eb9cf37f5)
1# Copyright 2018, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Module Info class used to hold cached module-info.json."""
16
17# pylint: disable=too-many-lines
18from __future__ import annotations
19
20import collections
21import json
22import logging
23import os
24from pathlib import Path
25import pickle
26import re
27import shutil
28import sqlite3
29import sys
30import tempfile
31import time
32from typing import Any, Callable, Dict, List, Set, Tuple
33
34from atest import atest_utils
35from atest import constants
36from atest.atest_enum import DetectType, ExitCode
37from atest.metrics import metrics
38
39
40# JSON file generated by build system that lists all buildable targets.
41_MODULE_INFO = 'module-info.json'
42# JSON file generated by build system that lists dependencies for java.
43_JAVA_DEP_INFO = 'module_bp_java_deps.json'
44# JSON file generated by build system that lists dependencies for cc.
45_CC_DEP_INFO = 'module_bp_cc_deps.json'
46# JSON file generated by atest merged the content from module-info,
47# module_bp_java_deps.json, and module_bp_cc_deps.
48_MERGED_INFO = 'atest_merged_dep.json'
49_DB_VERSION = 2
50_DB_NAME = f'module-info.{_DB_VERSION}.db'
51_NAME_MODULE_TABLE = 'modules'
52_PATH_MODULE_TABLE = 'path_modules'
53
54
55Module = Dict[str, Any]
56
57
58def load_from_file(
59    module_file: Path = None,
60    force_build: bool = False,
61) -> ModuleInfo:
62  """Factory method that initializes ModuleInfo from the build-generated
63
64  JSON file
65  """
66  loader = Loader(
67      module_file=module_file,
68      force_build=force_build,
69      need_merge_fn=lambda: False,
70  )
71
72  mi = loader.load()
73
74  return mi
75
76
77def load_from_dict(name_to_module_info: Dict[str, Any]) -> ModuleInfo:
78  """Factory method that initializes ModuleInfo from a dictionary."""
79  path_to_module_info = get_path_to_module_info(name_to_module_info)
80  return ModuleInfo(
81      name_to_module_info=name_to_module_info,
82      path_to_module_info=path_to_module_info,
83      get_testable_modules=lambda s: _get_testable_modules(
84          name_to_module_info, path_to_module_info, s
85      ),
86  )
87
88
89def create_empty() -> ModuleInfo:
90  """Factory method that initializes an empty ModuleInfo."""
91  return ModuleInfo()
92
93
94def load(
95    force_build: bool = False, sqlite_module_cache: bool = False
96) -> ModuleInfo:
97  """Factory method that initializes ModuleInfo from the build-generated
98
99  JSON or Sqlite file.
100  """
101  mod_start = time.time()
102  loader = Loader(
103      force_build=force_build, sqlite_module_cache=sqlite_module_cache
104  )
105  mod_stop = time.time() - mod_start
106  metrics.LocalDetectEvent(
107      detect_type=DetectType.MODULE_INFO_INIT_MS, result=int(mod_stop * 1000)
108  )
109
110  return loader.load(save_timestamps=True)
111
112
113def metrics_timer(func):
114  """Decorator method for sending data to metrics."""
115
116  def wrapper(*args, **kwargs):
117    start = time.time()
118    result = func(*args, **kwargs)
119    elapsed_time = int(time.time() - start)
120    metrics.LocalDetectEvent(
121        detect_type=DetectType.TESTABLE_MODULES, result=elapsed_time
122    )
123    return result
124
125  return wrapper
126
127
128class Loader:
129  """Class that handles load and merge processes."""
130
131  def __init__(
132      self,
133      module_file: Path = None,
134      force_build: bool = False,
135      sqlite_module_cache: bool = False,
136      need_merge_fn: Callable = None,
137  ):
138    logging.debug(
139        'Creating module info loader object with module_file: %s, force_build:'
140        ' %s, sqlite_module_cache: %s, need_merge_fn: %s',
141        module_file,
142        force_build,
143        sqlite_module_cache,
144        need_merge_fn,
145    )
146    self.java_dep_path = atest_utils.get_build_out_dir('soong', _JAVA_DEP_INFO)
147    self.cc_dep_path = atest_utils.get_build_out_dir('soong', _CC_DEP_INFO)
148    self.merged_dep_path = atest_utils.get_product_out(_MERGED_INFO)
149    logging.debug(
150        'java_dep_path: %s, cc_dep_path: %s, merged_dep_path: %s',
151        self.java_dep_path,
152        self.cc_dep_path,
153        self.merged_dep_path,
154    )
155
156    self.sqlite_module_cache = sqlite_module_cache
157    logging.debug('sqlite_module_cache: %s', sqlite_module_cache)
158    if self.sqlite_module_cache:
159      self.cache_file = atest_utils.get_product_out(_DB_NAME)
160      self.save_cache_async = self._save_db_async
161      self.load_from_cache = self._load_from_db
162    else:
163      self.cache_file = self.merged_dep_path
164      self.save_cache_async = self._save_json_async
165      self.load_from_cache = self._load_from_json
166
167    if need_merge_fn:
168      self.save_cache_async = lambda _, __: None
169
170    self.update_merge_info = False
171    self.module_index = atest_utils.get_index_path(
172        f'suite-modules.{_DB_VERSION}.idx'
173    )
174    self.module_index_proc = None
175    logging.debug('module_index: %s', self.module_index)
176
177    if module_file:
178      self.mod_info_file_path = Path(module_file)
179      self.load_module_info = self._load_module_info_from_file_wo_merging
180    else:
181      self.mod_info_file_path = atest_utils.get_product_out(_MODULE_INFO)
182      if force_build:
183        logging.debug('Triggering module info build by force build.')
184        build()
185      elif not self.mod_info_file_path.is_file():
186        logging.debug(
187            'Triggering module info build due to module info file path %s not'
188            ' exist.',
189            self.mod_info_file_path,
190        )
191        build()
192
193      self.update_merge_info = self.need_merge_module_info()
194      self.load_module_info = self._load_module_info_file
195
196    logging.debug(
197        'Executing load_module_info function %s', self.load_module_info
198    )
199    self.name_to_module_info, self.path_to_module_info = self.load_module_info()
200
201    logging.debug('Completed creating module info loader object')
202
203  def load(self, save_timestamps: bool = False):
204    logging.debug('Loading ModuleInfo. save_timestamps: %s', save_timestamps)
205    if save_timestamps:
206      atest_utils.run_multi_proc(func=atest_utils.save_build_files_timestamp)
207
208    return ModuleInfo(
209        name_to_module_info=self.name_to_module_info,
210        path_to_module_info=self.path_to_module_info,
211        mod_info_file_path=self.mod_info_file_path,
212        get_testable_modules=self.get_testable_modules,
213    )
214
215  def _load_module_info_file(self):
216    """Load module-info.json file as ModuleInfo and merge related JSON files
217
218    whenever required.
219
220    Returns:
221        Dict of module name to module info and dict of module path to module
222        info.
223    """
224    # +--------------+                  +----------------------------------+
225    # | ModuleInfo() |                  | ModuleInfo(module_file=foo.json) |
226    # +-------+------+                  +----------------+-----------------+
227    #         | module_info.build()                      | load
228    #         v                                          V
229    # +--------------------------+         +--------------------------+
230    # | module-info.json         |         | foo.json                 |
231    # | module_bp_cc_deps.json   |         | module_bp_cc_deps.json   |
232    # | module_bp_java_deps.json |         | module_bp_java_deps.json |
233    # +--------------------------+         +--------------------------+
234    #         |                                          |
235    #         | _merge_soong_info() <--------------------+
236    #         v
237    # +============================+
238    # |  $ANDROID_PRODUCT_OUT      |
239    # |    /atest_merged_dep.json  |--> load as module info.
240    # +============================+
241    if not self.update_merge_info:
242      return self.load_from_cache()
243
244    name_modules, path_modules = self._load_from_json(merge=True)
245    self.save_cache_async(name_modules, path_modules)
246    self._save_testable_modules_async(name_modules, path_modules)
247
248    return name_modules, path_modules
249
250  def _load_module_info_from_file_wo_merging(self):
251    """Load module-info.json as ModuleInfo without merging."""
252    name_modules = atest_utils.load_json_safely(self.mod_info_file_path)
253    _add_missing_variant_modules(name_modules)
254
255    return name_modules, get_path_to_module_info(name_modules)
256
257  def _save_db_async(
258      self,
259      name_to_module_info: Dict[str, Any],
260      path_to_module_info: Dict[str, Any],
261  ):
262    """Save data to a Sqlite database in parallel."""
263    data_map = {
264        _NAME_MODULE_TABLE: name_to_module_info,
265        _PATH_MODULE_TABLE: path_to_module_info,
266    }
267    _save_data_async(
268        function=_create_db,
269        contents=data_map,
270        target_path=self.cache_file,
271    )
272
273  def _load_from_db(self) -> Tuple[Dict[str, Any], Dict[str, Any]]:
274    """Return a tuple of dicts by from SqliteDict."""
275    conn = sqlite3.connect(self.cache_file)
276    with conn:
277      name_to_module_info = SqliteDict(conn, _NAME_MODULE_TABLE)
278      path_to_module_info = SqliteDict(conn, _PATH_MODULE_TABLE)
279
280      return name_to_module_info, path_to_module_info
281
282  def _save_json_async(self, name_to_module_info: Dict[str, Any], _):
283    """Save data to a JSON format in parallel."""
284    _save_data_async(
285        function=_create_json,
286        contents=name_to_module_info,
287        target_path=self.cache_file,
288    )
289
290  def _load_from_json(self, merge: bool = False) -> Tuple[Dict, Dict]:
291    """Load or merge module info from json file.
292
293    Args:
294        merge: Boolean whether to merge build system infos.
295
296    Returns:
297        A tuple of (name_to_module_info, path_to_module_info).
298    """
299    start = time.time()
300    if merge:
301      name_info = self._merge_build_system_infos(
302          atest_utils.load_json_safely(self.mod_info_file_path)
303      )
304      duration = time.time() - start
305      logging.debug('Merging module info took %ss', duration)
306      metrics.LocalDetectEvent(
307          detect_type=DetectType.MODULE_MERGE_MS, result=int(duration * 1000)
308      )
309
310      return name_info, get_path_to_module_info(name_info)
311
312    name_info = atest_utils.load_json_safely(self.merged_dep_path)
313    duration = time.time() - start
314    logging.debug('Loading module info took %ss', duration)
315    metrics.LocalDetectEvent(
316        detect_type=DetectType.MODULE_LOAD_MS, result=int(duration * 1000)
317    )
318    logging.debug('Loading %s as module-info.', self.merged_dep_path)
319
320    return name_info, get_path_to_module_info(name_info)
321
322  def _save_testable_modules_async(
323      self,
324      name_to_module_info: Dict[str, Any],
325      path_to_module_info: Dict[str, Any],
326  ):
327    """Save testable modules in parallel."""
328    return atest_utils.run_multi_proc(
329        func=_get_testable_modules,
330        kwargs={
331            'name_to_module_info': name_to_module_info,
332            'path_to_module_info': path_to_module_info,
333            'index_path': self.module_index,
334        },
335    )
336
337  def need_merge_module_info(self):
338    """Check if needed to regenerate the cache file.
339
340    If the cache file is non-existent or testable module index is inexistent
341    or older than any of the JSON files used to generate it, the cache file
342    must re-generate.
343
344    Returns:
345        True when the cache file is older or non-existent, False otherwise.
346    """
347    if not self.cache_file.is_file():
348      return True
349
350    if not self.module_index.is_file():
351      return True
352
353    # The dependency input files should be generated at this point.
354    return any(
355        self.cache_file.stat().st_mtime < f.stat().st_mtime
356        for f in (self.mod_info_file_path, self.java_dep_path, self.cc_dep_path)
357    )
358
359  def _merge_build_system_infos(
360      self, name_to_module_info, java_bp_info_path=None, cc_bp_info_path=None
361  ):
362    """Merge the content of module-info.json and CC/Java dependency files
363
364    to name_to_module_info.
365
366    Args:
367        name_to_module_info: Dict of module name to module info dict.
368        java_bp_info_path: String of path to java dep file to load up. Used for
369          testing.
370        cc_bp_info_path: String of path to cc dep file to load up. Used for
371          testing.
372
373    Returns:
374        Dict of updated name_to_module_info.
375    """
376    # Merge _JAVA_DEP_INFO
377    if not java_bp_info_path:
378      java_bp_info_path = self.java_dep_path
379    java_bp_infos = atest_utils.load_json_safely(java_bp_info_path)
380    if java_bp_infos:
381      logging.debug('Merging Java build info: %s', java_bp_info_path)
382      name_to_module_info = merge_soong_info(name_to_module_info, java_bp_infos)
383    # Merge _CC_DEP_INFO
384    if not cc_bp_info_path:
385      cc_bp_info_path = self.cc_dep_path
386    cc_bp_infos = atest_utils.load_json_safely(cc_bp_info_path)
387    if cc_bp_infos:
388      logging.debug('Merging CC build info: %s', cc_bp_info_path)
389      # CC's dep json format is different with java.
390      # Below is the example content:
391      # {
392      #   "clang": "${ANDROID_ROOT}/bin/clang",
393      #   "clang++": "${ANDROID_ROOT}/bin/clang++",
394      #   "modules": {
395      #       "ACameraNdkVendorTest": {
396      #           "path": [
397      #                   "frameworks/av/camera/ndk"
398      #           ],
399      #           "srcs": [
400      #                   "frameworks/tests/AImageVendorTest.cpp",
401      #                   "frameworks/tests/ACameraManagerTest.cpp"
402      #           ],
403      name_to_module_info = merge_soong_info(
404          name_to_module_info, cc_bp_infos.get('modules', {})
405      )
406    # If $ANDROID_PRODUCT_OUT was not created in pyfakefs, simply return it
407    # without dumping atest_merged_dep.json in real.
408
409    # Adds the key into module info as a unique ID.
410    for key, info in name_to_module_info.items():
411      info[constants.MODULE_INFO_ID] = key
412
413    _add_missing_variant_modules(name_to_module_info)
414
415    return name_to_module_info
416
417  @metrics_timer
418  def get_testable_modules(self, suite=None):
419    """Return the testable modules of the given suite name.
420
421    Atest does not index testable modules against compatibility_suites. When
422    suite was given, or the index file was interrupted, always run
423    _get_testable_modules() and re-index.
424
425    Args:
426        suite: A string of suite name.
427
428    Returns:
429        If suite is not given, return all the testable modules in module
430        info, otherwise return only modules that belong to the suite.
431    """
432    modules = set()
433
434    if self.module_index.is_file():
435      modules = self.get_testable_modules_from_index(suite)
436    # If the modules.idx does not exist or invalid for any reason, generate
437    # a new one arbitrarily.
438    if not modules:
439      modules = self.get_testable_module_from_memory(suite)
440
441    return modules
442
443  def get_testable_modules_from_index(self, suite: str = None) -> Set[str]:
444    """Return the testable modules of the given suite name."""
445    suite_to_modules = {}
446    with open(self.module_index, 'rb') as cache:
447      try:
448        suite_to_modules = pickle.load(cache, encoding='utf-8')
449      except UnicodeDecodeError:
450        suite_to_modules = pickle.load(cache)
451      # when module indexing was interrupted.
452      except EOFError:
453        pass
454
455    return _filter_modules_by_suite(suite_to_modules, suite)
456
457  def get_testable_module_from_memory(self, suite: str = None) -> Set[str]:
458    """Return the testable modules of the given suite name."""
459    return _get_testable_modules(
460        name_to_module_info=self.name_to_module_info,
461        path_to_module_info=self.path_to_module_info,
462        index_path=self.module_index,
463        suite=suite,
464    )
465
466
467class ModuleInfo:
468  """Class that offers fast/easy lookup for Module related details."""
469
470  def __init__(
471      self,
472      name_to_module_info: Dict[str, Any] = None,
473      path_to_module_info: Dict[str, Any] = None,
474      mod_info_file_path: Path = None,
475      get_testable_modules: Callable = None,
476  ):
477    """Initialize the ModuleInfo object.
478
479    Load up the module-info.json file and initialize the helper vars.
480    Note that module-info.json does not contain all module dependencies,
481    therefore, Atest needs to accumulate dependencies defined in bp files.
482
483    Args:
484        name_to_module_info: Dict of name to module info.
485        path_to_module_info: Dict of path to module info.
486        mod_info_file_path: Path of module-info.json.
487        get_testable_modules: Function to get all testable modules.
488    """
489    #   +----------------------+     +----------------------------+
490    #   | $ANDROID_PRODUCT_OUT |     |$ANDROID_BUILD_TOP/out/soong|
491    #   |  /module-info.json   |     |  /module_bp_java_deps.json |
492    #   +-----------+----------+     +-------------+--------------+
493    #               |     _merge_soong_info()      |
494    #               +------------------------------+
495    #               |
496    #               v
497    # +----------------------------+  +----------------------------+
498    # |tempfile.NamedTemporaryFile |  |$ANDROID_BUILD_TOP/out/soong|
499    # +-------------+--------------+  |  /module_bp_cc_deps.json   |
500    #               |                 +-------------+--------------+
501    #               |     _merge_soong_info()       |
502    #               +-------------------------------+
503    #                              |
504    #                      +-------|
505    #                      v
506    #         +============================+
507    #         |  $ANDROID_PRODUCT_OUT      |
508    #         |    /atest_merged_dep.json  |--> load as module info.
509    #         +============================+
510    self.root_dir = os.environ.get(constants.ANDROID_BUILD_TOP)
511
512    self.name_to_module_info = name_to_module_info or {}
513    self.path_to_module_info = path_to_module_info or {}
514    self.mod_info_file_path = mod_info_file_path
515    self._get_testable_modules = get_testable_modules
516
517  def is_module(self, name):
518    """Return True if name is a module, False otherwise."""
519    info = self.get_module_info(name)
520    # From aosp/2293302 it started merging all modules' dependency in bp
521    # even the module is not be exposed to make, and those modules could not
522    # be treated as a build target using m. Only treat input name as module
523    # if it also has the module_name attribute which means it could be a
524    # build target for m.
525    if info and info.get(constants.MODULE_NAME):
526      return True
527    return False
528
529  def get_paths(self, name) -> list[str]:
530    """Return paths of supplied module name, Empty list if non-existent."""
531    info = self.get_module_info(name)
532    if info:
533      return info.get(constants.MODULE_PATH, [])
534    return []
535
536  def get_module_names(self, rel_module_path):
537    """Get the modules that all have module_path.
538
539    Args:
540        rel_module_path: path of module in module-info.json
541
542    Returns:
543        List of module names.
544    """
545    return _get_module_names(self.path_to_module_info, rel_module_path)
546
547  def get_module_info(self, mod_name):
548    """Return dict of info for given module name, None if non-existence."""
549    return self.name_to_module_info.get(mod_name)
550
551  @staticmethod
552  def is_suite_in_compatibility_suites(suite, mod_info):
553    """Check if suite exists in the compatibility_suites of module-info.
554
555    Args:
556        suite: A string of suite name.
557        mod_info: Dict of module info to check.
558
559    Returns:
560        True if it exists in mod_info, False otherwise.
561    """
562    if not isinstance(mod_info, dict):
563      return False
564    return suite in mod_info.get(constants.MODULE_COMPATIBILITY_SUITES, [])
565
566  def get_testable_modules(self, suite=None):
567    return self._get_testable_modules(suite)
568
569  @staticmethod
570  def is_tradefed_testable_module(info: Dict[str, Any]) -> bool:
571    """Check whether the module is a Tradefed executable test."""
572    if not info:
573      return False
574    if not info.get(constants.MODULE_INSTALLED, []):
575      return False
576    return ModuleInfo.has_test_config(info)
577
578  @staticmethod
579  def is_mobly_module(info: Dict[str, Any]) -> bool:
580    """Check whether the module is a Mobly test.
581
582    Note: Only python_test_host modules marked with a test_options tag of
583      "mobly" is considered a Mobly module.
584
585    Args:
586        info: Dict of module info to check.
587
588    Returns:
589        True if this is a Mobly test module, False otherwise.
590    """
591    return constants.MOBLY_TEST_OPTIONS_TAG in info.get(
592        constants.MODULE_TEST_OPTIONS_TAGS, []
593    )
594
595  def is_testable_module(self, info: Dict[str, Any]) -> bool:
596    """Check if module is something we can test.
597
598    A module is testable if:
599      - it's a tradefed testable module, or
600      - it's a Mobly module, or
601      - it's a robolectric module (or shares path with one).
602
603    Args:
604        info: Dict of module info to check.
605
606    Returns:
607        True if we can test this module, False otherwise.
608    """
609    return _is_testable_module(
610        self.name_to_module_info, self.path_to_module_info, info
611    )
612
613  @staticmethod
614  def has_test_config(info: Dict[str, Any]) -> bool:
615    """Validate if this module has a test config.
616
617    A module can have a test config in the following manner:
618      - test_config be set in module-info.json.
619      - Auto-generated config via the auto_test_config key
620        in module-info.json.
621
622    Args:
623        info: Dict of module info to check.
624
625    Returns:
626        True if this module has a test config, False otherwise.
627    """
628    return bool(
629        info.get(constants.MODULE_TEST_CONFIG, [])
630        or info.get('auto_test_config', [])
631    )
632
633  def is_legacy_robolectric_test(self, info: Dict[str, Any]) -> bool:
634    """Return whether the module_name is a legacy Robolectric test"""
635    return _is_legacy_robolectric_test(
636        self.name_to_module_info, self.path_to_module_info, info
637    )
638
639  def get_robolectric_test_name(self, info: Dict[str, Any]) -> str:
640    """Returns runnable robolectric module name.
641
642    This method is for legacy robolectric tests and returns one of associated
643    modules. The pattern is determined by the amount of shards:
644
645    10 shards:
646        FooTests -> RunFooTests0, RunFooTests1 ... RunFooTests9
647    No shard:
648        FooTests -> RunFooTests
649
650    Arg:
651        info: Dict of module info to check.
652
653    Returns:
654        String of the first-matched associated module that belongs to the
655        actual robolectric module, None if nothing has been found.
656    """
657    return _get_robolectric_test_name(
658        self.name_to_module_info, self.path_to_module_info, info
659    )
660
661  def is_robolectric_test(self, module_name):
662    """Check if the given module is a robolectric test.
663
664    Args:
665        module_name: String of module to check.
666
667    Returns:
668        Boolean whether it's a robotest or not.
669    """
670    if self.get_robolectric_type(module_name):
671      return True
672    return False
673
674  def get_robolectric_type(self, module_name: str) -> int:
675    """Check if the given module is a robolectric test and return type of it.
676
677    Robolectric declaration is converting from Android.mk to Android.bp, and
678    in the interim Atest needs to support testing both types of tests.
679
680    The modern robolectric tests defined by 'android_robolectric_test' in an
681    Android.bp file can can be run in Tradefed Test Runner:
682
683        SettingsRoboTests -> Tradefed Test Runner
684
685    Legacy tests defined in an Android.mk can only run with the 'make' way.
686
687        SettingsRoboTests -> make RunSettingsRoboTests0
688
689    To determine whether the test is a modern/legacy robolectric test:
690        1. If the 'robolectric-test` in the compatibility_suites, it's a
691           modern one, otherwise it's a legacy test. This is accurate since
692           aosp/2308586 already set the test suite of `robolectric-test`
693           for all `modern` Robolectric tests in Soong.
694        2. Traverse all modules share the module path. If one of the
695           modules has a ROBOLECTRIC class, it's a legacy robolectric test.
696
697    Args:
698        module_name: String of module to check.
699
700    Returns:
701        0: not a robolectric test.
702        1: a modern robolectric test(defined in Android.bp)
703        2: a legacy robolectric test(defined in Android.mk)
704    """
705    info = self.get_module_info(module_name)
706    if not info:
707      return 0
708    # Some Modern mode Robolectric test has related module which compliant
709    # with the Legacy Robolectric test. In this case, the Modern mode
710    # Robolectric tests should be prior to the Legacy mode.
711    if self.is_modern_robolectric_test(info):
712      return constants.ROBOTYPE_MODERN
713    if self.is_legacy_robolectric_test(info):
714      return constants.ROBOTYPE_LEGACY
715    return 0
716
717  def get_instrumentation_target_apps(self, module_name: str) -> Dict:
718    """Return target APKs of an instrumentation test.
719
720    Returns:
721        A dict of target module and target APK(s). e.g.
722        {"FooService": {"/path/to/the/FooService.apk"}}
723    """
724    # 1. Determine the actual manifest filename from an Android.bp(if any)
725    manifest = self.get_filepath_from_module(module_name, 'AndroidManifest.xml')
726    bpfile = self.get_filepath_from_module(module_name, 'Android.bp')
727    if bpfile.is_file():
728      bp_info = atest_utils.get_bp_content(bpfile, 'android_test')
729      if not bp_info or not bp_info.get(module_name):
730        return {}
731      manifest = self.get_filepath_from_module(
732          module_name, bp_info.get(module_name).get('manifest')
733      )
734    xml_info = atest_utils.get_manifest_info(manifest)
735    # 2. Translate package name to a module name.
736    package = xml_info.get('package')
737    target_package = xml_info.get('target_package')
738    # Ensure it's an instrumentation test(excluding self-instrmented)
739    if target_package and package != target_package:
740      logging.debug('Found %s an instrumentation test.', module_name)
741      metrics.LocalDetectEvent(
742          detect_type=DetectType.FOUND_INSTRUMENTATION_TEST, result=1
743      )
744      target_module = self.get_target_module_by_pkg(
745          package=target_package, search_from=manifest.parent
746      )
747      if target_module:
748        return self.get_artifact_map(target_module)
749    return {}
750
751  # pylint: disable=anomalous-backslash-in-string
752  def get_target_module_by_pkg(self, package: str, search_from: Path) -> str:
753    """Translate package name to the target module name.
754
755    This method is dedicated to determine the target module by translating
756    a package name.
757
758    Phase 1: Find out possible manifest files among parent directories.
759    Phase 2. Look for the defined package fits the given name, and ensure
760             it is not a persistent app.
761    Phase 3: Translate the manifest path to possible modules. A valid module
762             must fulfill:
763             1. The 'class' type must be ['APPS'].
764             2. It is not a Robolectric test.
765
766    Returns:
767        A string of module name.
768    """
769    xmls = []
770    for pth in search_from.parents:
771      if pth == Path(self.root_dir):
772        break
773      for name in os.listdir(pth):
774        if pth.joinpath(name).is_file():
775          match = re.match('.*AndroidManifest.*\.xml$', name)
776          if match:
777            xmls.append(os.path.join(pth, name))
778    possible_modules = []
779    for xml in xmls:
780      rel_dir = str(Path(xml).relative_to(self.root_dir).parent)
781      logging.debug('Looking for package "%s" in %s...', package, xml)
782      xml_info = atest_utils.get_manifest_info(xml)
783      if xml_info.get('package') == package:
784        if xml_info.get('persistent'):
785          logging.debug('%s is a persistent app.', package)
786          continue
787        for _m in self.path_to_module_info.get(rel_dir):
788          possible_modules.append(_m)
789    if possible_modules:
790      for mod in possible_modules:
791        name = mod.get('module_name')
792        if mod.get('class') == ['APPS'] and not self.is_robolectric_test(name):
793          return name
794    return ''
795
796  def get_artifact_map(self, module_name: str) -> Dict:
797    """Get the installed APK path of the given module."""
798    target_mod_info = self.get_module_info(module_name)
799    artifact_map = {}
800    if target_mod_info:
801      apks = set()
802      artifacts = target_mod_info.get('installed')
803      for artifact in artifacts:
804        if Path(artifact).suffix == '.apk':
805          apks.add(os.path.join(self.root_dir, artifact))
806      artifact_map.update({module_name: apks})
807    return artifact_map
808
809  def is_auto_gen_test_config(self, module_name):
810    """Check if the test config file will be generated automatically.
811
812    Args:
813        module_name: A string of the module name.
814
815    Returns:
816        True if the test config file will be generated automatically.
817    """
818    if self.is_module(module_name):
819      mod_info = self.get_module_info(module_name)
820      auto_test_config = mod_info.get('auto_test_config', [])
821      return auto_test_config and auto_test_config[0]
822    return False
823
824  @staticmethod
825  def is_legacy_robolectric_class(info: Dict[str, Any]) -> bool:
826    """Check if the class is `ROBOLECTRIC`
827
828    This method is for legacy robolectric tests that the associated modules
829    contain:
830        'class': ['ROBOLECTRIC']
831
832    Args:
833        info: ModuleInfo to check.
834
835    Returns:
836        True if the attribute class in mod_info is ROBOLECTRIC, False
837        otherwise.
838    """
839    if info:
840      module_classes = info.get(constants.MODULE_CLASS, [])
841      return (
842          module_classes
843          and module_classes[0] == constants.MODULE_CLASS_ROBOLECTRIC
844      )
845    return False
846
847  def is_native_test(self, module_name):
848    """Check if the input module is a native test.
849
850    Args:
851        module_name: A string of the module name.
852
853    Returns:
854        True if the test is a native test, False otherwise.
855    """
856    mod_info = self.get_module_info(module_name)
857    return constants.MODULE_CLASS_NATIVE_TESTS in mod_info.get(
858        constants.MODULE_CLASS, []
859    )
860
861  def has_mainline_modules(
862      self, module_name: str, mainline_binaries: List[str]
863  ) -> bool:
864    """Check if the mainline modules are in module-info.
865
866    Args:
867        module_name: A string of the module name.
868        mainline_binaries: A list of mainline module binaries.
869
870    Returns:
871        True if mainline_binaries is in module-info, False otherwise.
872    """
873    mod_info = self.get_module_info(module_name)
874    # Check 'test_mainline_modules' attribute of the module-info.json.
875    mm_in_mf = mod_info.get(constants.MODULE_MAINLINE_MODULES, [])
876    ml_modules_set = set(mainline_binaries)
877    if mm_in_mf:
878      return contains_same_mainline_modules(ml_modules_set, set(mm_in_mf))
879    for test_config in mod_info.get(constants.MODULE_TEST_CONFIG, []):
880      # Check the value of 'mainline-param' in the test config.
881      if not self.is_auto_gen_test_config(module_name):
882        return contains_same_mainline_modules(
883            ml_modules_set,
884            atest_utils.get_mainline_param(
885                os.path.join(self.root_dir, test_config)
886            ),
887        )
888      # Unable to verify mainline modules in an auto-gen test config.
889      logging.debug(
890          '%s is associated with an auto-generated test config.', module_name
891      )
892      return True
893    return False
894
895  def get_filepath_from_module(self, module_name: str, filename: str) -> Path:
896    """Return absolute path of the given module and filename."""
897    mod_path = self.get_paths(module_name)
898    if mod_path:
899      return Path(self.root_dir).joinpath(mod_path[0], filename)
900    return Path()
901
902  def get_module_dependency(self, module_name, depend_on=None):
903    """Get the dependency sets for input module.
904
905    Recursively find all the dependencies of the input module.
906
907    Args:
908        module_name: String of module to check.
909        depend_on: The list of parent dependencies.
910
911    Returns:
912        Set of dependency modules.
913    """
914    if not depend_on:
915      depend_on = set()
916    deps = set()
917    mod_info = self.get_module_info(module_name)
918    if not mod_info:
919      return deps
920    mod_deps = set(mod_info.get(constants.MODULE_DEPENDENCIES, []))
921    # Remove item in deps if it already in depend_on:
922    mod_deps = mod_deps - depend_on
923    deps = deps.union(mod_deps)
924    for mod_dep in mod_deps:
925      deps = deps.union(
926          set(
927              self.get_module_dependency(
928                  mod_dep, depend_on=depend_on.union(deps)
929              )
930          )
931      )
932    return deps
933
934  def get_install_module_dependency(self, module_name, depend_on=None):
935    """Get the dependency set for the given modules with installed path.
936
937    Args:
938        module_name: String of module to check.
939        depend_on: The list of parent dependencies.
940
941    Returns:
942        Set of dependency modules which has installed path.
943    """
944    install_deps = set()
945    deps = self.get_module_dependency(module_name, depend_on)
946    logging.debug('%s depends on: %s', module_name, deps)
947    for module in deps:
948      mod_info = self.get_module_info(module)
949      if mod_info and mod_info.get(constants.MODULE_INSTALLED, []):
950        install_deps.add(module)
951    logging.debug(
952        'modules %s required by %s were not installed',
953        install_deps,
954        module_name,
955    )
956    return install_deps
957
958  @staticmethod
959  def is_unit_test(mod_info):
960    """Return True if input module is unit test, False otherwise.
961
962    Args:
963        mod_info: ModuleInfo to check.
964
965    Returns:
966        True if input module is unit test, False otherwise.
967    """
968    return mod_info.get(constants.MODULE_IS_UNIT_TEST, '') == 'true'
969
970  def is_host_unit_test(self, info: Dict[str, Any]) -> bool:
971    """Return True if input module is host unit test, False otherwise.
972
973    Args:
974        info: ModuleInfo to check.
975
976    Returns:
977        True if input module is host unit test, False otherwise.
978    """
979    return self.is_tradefed_testable_module(
980        info
981    ) and self.is_suite_in_compatibility_suites('host-unit-tests', info)
982
983  def is_modern_robolectric_test(self, info: Dict[str, Any]) -> bool:
984    """Return whether 'robolectric-tests' is in 'compatibility_suites'."""
985    return self.is_tradefed_testable_module(
986        info
987    ) and self.is_robolectric_test_suite(info)
988
989  def is_robolectric_test_suite(self, mod_info) -> bool:
990    """Return True if 'robolectric-tests' in the compatibility_suites.
991
992    Args:
993        mod_info: ModuleInfo to check.
994
995    Returns:
996        True if the 'robolectric-tests' is in the compatibility_suites,
997        False otherwise.
998    """
999    return self.is_suite_in_compatibility_suites('robolectric-tests', mod_info)
1000
1001  def is_ravenwood_test(self, info: Dict[str, Any]) -> bool:
1002    """Return whether 'ravenwood-tests' is in 'compatibility_suites'."""
1003    return self.is_tradefed_testable_module(
1004        info
1005    ) and self.is_ravenwood_test_suite(info)
1006
1007  def is_ravenwood_test_suite(self, mod_info) -> bool:
1008    """Return True if 'ravenwood-tests' in the compatibility_suites.
1009
1010    Args:
1011        mod_info: ModuleInfo to check.
1012
1013    Returns:
1014        True if the 'ravenwood-tests' is in the compatibility_suites,
1015        False otherwise.
1016    """
1017    return self.is_suite_in_compatibility_suites('ravenwood-tests', mod_info)
1018
1019  def is_device_driven_test(self, mod_info):
1020    """Return True if input module is device driven test, False otherwise.
1021
1022    Args:
1023        mod_info: ModuleInfo to check.
1024
1025    Returns:
1026        True if input module is device driven test, False otherwise.
1027    """
1028    if self.is_robolectric_test_suite(mod_info):
1029      return False
1030    if self.is_ravenwood_test_suite(mod_info):
1031      return False
1032
1033    return self.is_tradefed_testable_module(
1034        mod_info
1035    ) and 'DEVICE' in mod_info.get(constants.MODULE_SUPPORTED_VARIANTS, [])
1036
1037  def is_host_driven_test(self, mod_info):
1038    """Return True if input module is host driven test, False otherwise.
1039
1040    Args:
1041        mod_info: ModuleInfo to check.
1042
1043    Returns:
1044        True if input module is host driven test, False otherwise.
1045    """
1046    return self.is_tradefed_testable_module(
1047        mod_info
1048    ) and 'HOST' in mod_info.get(constants.MODULE_SUPPORTED_VARIANTS, [])
1049
1050  def _any_module(self, _: Module) -> bool:
1051    return True
1052
1053  def get_all_tests(self):
1054    """Get a list of all the module names which are tests."""
1055    return self._get_all_modules(type_predicate=self.is_testable_module)
1056
1057  def get_all_unit_tests(self):
1058    """Get a list of all the module names which are unit tests."""
1059    return self._get_all_modules(type_predicate=ModuleInfo.is_unit_test)
1060
1061  def get_all_host_unit_tests(self):
1062    """Get a list of all the module names which are host unit tests."""
1063    return self._get_all_modules(type_predicate=self.is_host_unit_test)
1064
1065  def get_all_device_driven_tests(self):
1066    """Get a list of all the module names which are device driven tests."""
1067    return self._get_all_modules(type_predicate=self.is_device_driven_test)
1068
1069  def _get_all_modules(self, type_predicate=None):
1070    """Get a list of all the module names that passed the predicate."""
1071    modules = []
1072    type_predicate = type_predicate or self._any_module
1073    for mod_name, mod_info in self.name_to_module_info.items():
1074      if mod_info.get(constants.MODULE_NAME, '') == mod_name:
1075        if type_predicate(mod_info):
1076          modules.append(mod_name)
1077    return modules
1078
1079  def get_modules_by_path_in_srcs(
1080      self, path: str, testable_modules_only: bool = False
1081  ) -> Set[str]:
1082    """Get the module name that the given path belongs to.(in 'srcs')
1083
1084    Args:
1085        path: file path which is relative to ANDROID_BUILD_TOP.
1086        testable_modules_only: boolean flag which determines whether search
1087          testable modules only or not.
1088
1089    Returns:
1090        A set of string for matched module names, empty set if nothing find.
1091    """
1092    modules = set()
1093
1094    for mod_name in (
1095        self.get_testable_modules()
1096        if testable_modules_only
1097        else self.name_to_module_info.keys()
1098    ):
1099      m_info = self.get_module_info(mod_name)
1100      if m_info:
1101        for src in m_info.get(constants.MODULE_SRCS, []):
1102          if src in path:
1103            modules.add(mod_name)
1104
1105    return modules
1106
1107  def get_modules_by_path(
1108      self, path: str, testable_modules_only: bool = False
1109  ) -> set[str]:
1110    """Get the module names that the give path belongs to.
1111
1112    Args:
1113        path: dir path for searching among `path` in module information.
1114        testable_modules_only: boolean flag which determines whether search
1115          testable modules only or not.
1116
1117    Returns:
1118        A set of module names.
1119    """
1120    modules = set()
1121    is_testable_module_fn = (
1122        self.is_testable_module if testable_modules_only else lambda _: True
1123    )
1124
1125    m_infos = self.path_to_module_info.get(path)
1126    if m_infos:
1127      modules = {
1128          info.get(constants.MODULE_NAME)
1129          for info in m_infos
1130          if is_testable_module_fn(info)
1131      }
1132
1133    return modules
1134
1135  def get_modules_by_include_deps(
1136      self, deps: Set[str], testable_module_only: bool = False
1137  ) -> Set[str]:
1138    """Get the matched module names for the input dependencies.
1139
1140    Args:
1141        deps: A set of string for dependencies.
1142        testable_module_only: Option if only want to get testable module.
1143
1144    Returns:
1145        A set of matched module names for the input dependencies.
1146    """
1147    modules = set()
1148
1149    for mod_name in (
1150        self.get_testable_modules()
1151        if testable_module_only
1152        else self.name_to_module_info.keys()
1153    ):
1154      mod_info = self.get_module_info(mod_name)
1155      if mod_info and deps.intersection(
1156          set(mod_info.get(constants.MODULE_DEPENDENCIES, []))
1157      ):
1158        modules.add(mod_info.get(constants.MODULE_NAME))
1159    return modules
1160
1161  def get_installed_paths(self, module_name: str) -> List[Path]:
1162    """Return installed path from module info."""
1163    mod_info = self.get_module_info(module_name)
1164    if not mod_info:
1165      return []
1166
1167    def _to_abs_path(p):
1168      if os.path.isabs(p):
1169        return Path(p)
1170      return Path(os.getenv(constants.ANDROID_BUILD_TOP), p)
1171
1172    return [_to_abs_path(p) for p in mod_info.get('installed', [])]
1173
1174  def get_code_under_test(self, module_name: str) -> List[str]:
1175    """Return code under test from module info."""
1176    mod_info = self.get_module_info(module_name)
1177    if not mod_info:
1178      atest_utils.colorful_print(
1179          '\nmodule %s cannot be found in module info, skip generating'
1180          ' coverage for it.' % module_name,
1181          constants.YELLOW,
1182      )
1183      return []
1184
1185    return mod_info.get('code_under_test', [])
1186
1187  def build_variants(self, info: Dict[str, Any]) -> List[str]:
1188    return info.get(constants.MODULE_SUPPORTED_VARIANTS, [])
1189
1190  def requires_device(self, info: Dict[str, Any]) -> bool:
1191
1192    if self.is_modern_robolectric_test(info):
1193      return False
1194    if self.is_ravenwood_test(info):
1195      return False
1196    if self.is_host_unit_test(info) and 'DEVICE' not in self.build_variants(
1197        info
1198    ):
1199      return False
1200
1201    return True
1202
1203
1204def _create_db(data_map: Dict[str, Dict[str, Any]], db_path: Path):
1205  """Create a Sqlite DB by writing to tempfile and move it to the right place.
1206
1207  Args:
1208      data_map: A dict where the key is table name and value is data itself.
1209      db_path: A Path pointing to the DB file.
1210  """
1211  if db_path.is_file():
1212    db_path.unlink()
1213
1214  with tempfile.NamedTemporaryFile(delete=False) as tmp_db:
1215    _create_db_in_path(data_map, tmp_db.name)
1216    shutil.move(tmp_db.name, db_path)
1217
1218    logging.debug('%s is created successfully.', db_path)
1219
1220
1221def _create_db_in_path(data_map: Dict[str, Dict[str, Any]], db_path: Path):
1222  """Create a Sqlite DB with multiple tables.
1223
1224  Args:
1225      data_map: A dict where the key is table name and value is data itself.
1226      db_path: A Path pointing to the DB file.
1227  """
1228  con = sqlite3.connect(db_path)
1229  with con:
1230    cur = con.cursor()
1231    for table, contents in data_map.items():
1232      cur.execute(f'CREATE TABLE {table}(key TEXT PRIMARY KEY, value TEXT)')
1233
1234      data = []
1235      for k, v in contents.items():
1236        data.append({'key': k, 'value': json.dumps(v)})
1237      cur.executemany(f'INSERT INTO {table} VALUES(:key, :value)', data)
1238
1239
1240def _create_json(data_map: Dict[str, Any], json_path: Path):
1241  """Write content onto a JSON file.
1242
1243  Args:
1244      data_map: A dict where the key is table name and value is data itself.
1245      json_path: A Path pointing to the JSON file.
1246  """
1247  if json_path.is_file():
1248    json_path.unlink()
1249
1250  with tempfile.NamedTemporaryFile(delete=False) as temp_json:
1251    with open(temp_json.name, 'w', encoding='utf-8') as _temp:
1252      json.dump(data_map, _temp, indent=0)
1253    shutil.move(temp_json.name, json_path)
1254
1255    logging.debug('%s is created successfully.', json_path)
1256
1257
1258def _save_data_async(function: Callable, contents: Any, target_path: Path):
1259  """Save contents to a static file in asynchronized manner."""
1260  atest_utils.run_multi_proc(
1261      func=function,
1262      args=[contents, target_path],
1263      # We set `daemon` to `False` to make sure that Atest doesn't exit before
1264      # writing the cache file.
1265      daemon=False,
1266  )
1267
1268
1269def merge_soong_info(name_to_module_info, mod_bp_infos):
1270  """Merge the dependency and srcs in mod_bp_infos to name_to_module_info.
1271
1272  Args:
1273      name_to_module_info: Dict of module name to module info dict.
1274      mod_bp_infos: Dict of module name to bp's module info dict.
1275
1276  Returns:
1277      Dict of updated name_to_module_info.
1278  """
1279  merge_items = [
1280      constants.MODULE_DEPENDENCIES,
1281      constants.MODULE_SRCS,
1282      constants.MODULE_LIBS,
1283      constants.MODULE_STATIC_LIBS,
1284      constants.MODULE_STATIC_DEPS,
1285      constants.MODULE_PATH,
1286  ]
1287  for module_name, dep_info in mod_bp_infos.items():
1288    mod_info = name_to_module_info.setdefault(module_name, {})
1289    for merge_item in merge_items:
1290      dep_info_values = dep_info.get(merge_item, [])
1291      mod_info_values = mod_info.get(merge_item, [])
1292      mod_info_values.extend(dep_info_values)
1293      mod_info_values.sort()
1294      # deduplicate values just in case.
1295      mod_info_values = list(dict.fromkeys(mod_info_values))
1296      name_to_module_info[module_name][merge_item] = mod_info_values
1297  return name_to_module_info
1298
1299
1300def _add_missing_variant_modules(name_to_module_info: Dict[str, Module]):
1301  missing_modules = {}
1302
1303  # Android's build system automatically adds a suffix for some build module
1304  # variants. For example, a module-info entry for a module originally named
1305  # 'HelloWorldTest' might appear as 'HelloWorldTest_32' and which Atest would
1306  # not be able to find. We add such entries if not already present so they
1307  # can be looked up using their declared module name.
1308  for mod_name, mod_info in name_to_module_info.items():
1309    declared_module_name = mod_info.get(constants.MODULE_NAME, mod_name)
1310    if declared_module_name in name_to_module_info:
1311      continue
1312    missing_modules.setdefault(declared_module_name, mod_info)
1313
1314  name_to_module_info.update(missing_modules)
1315
1316
1317def contains_same_mainline_modules(
1318    mainline_modules: Set[str], module_lists: Set[str]
1319):
1320  """Check if mainline modules listed on command line is
1321
1322  the same set as config.
1323
1324  Args:
1325      mainline_modules: A list of mainline modules from triggered test.
1326      module_lists: A list of concatenate mainline module string from test
1327        configs.
1328
1329  Returns
1330      True if the set mainline modules from triggered test is in the test
1331        configs.
1332  """
1333  for module_string in module_lists:
1334    if mainline_modules == set(module_string.split('+')):
1335      return True
1336  return False
1337
1338
1339def get_path_to_module_info(name_to_module_info):
1340  """Return the path_to_module_info dict.
1341
1342  Args:
1343      name_to_module_info: Dict of module name to module info dict.
1344
1345  Returns:
1346      Dict of module path to module info dict.
1347  """
1348  path_to_module_info = {}
1349  for mod_name, mod_info in name_to_module_info.items():
1350    # Cross-compiled and multi-arch modules actually all belong to
1351    # a single target so filter out these extra modules.
1352    if mod_name != mod_info.get(constants.MODULE_NAME, ''):
1353      continue
1354    for path in mod_info.get(constants.MODULE_PATH, []):
1355      mod_info[constants.MODULE_NAME] = mod_name
1356      # There could be multiple modules in a path.
1357      if path in path_to_module_info:
1358        path_to_module_info[path].append(mod_info)
1359      else:
1360        path_to_module_info[path] = [mod_info]
1361  return path_to_module_info
1362
1363
1364def _get_module_names(path_to_module_info, rel_module_path):
1365  """Get the modules that all have module_path.
1366
1367  Args:
1368      path_to_module_info: Dict of path to module info.
1369      rel_module_path: path of module in module-info.json.
1370
1371  Returns:
1372      List of module names.
1373  """
1374  return [
1375      m.get(constants.MODULE_NAME)
1376      for m in path_to_module_info.get(rel_module_path, [])
1377  ]
1378
1379
1380def _get_robolectric_test_name(
1381    name_to_module_info: Dict[str, Dict],
1382    path_to_module_info: Dict[str, Dict],
1383    info: Dict[str, Any],
1384) -> str:
1385  """Returns runnable robolectric module name.
1386
1387  This method is for legacy robolectric tests and returns one of associated
1388  modules. The pattern is determined by the amount of shards:
1389
1390  10 shards:
1391      FooTests -> RunFooTests0, RunFooTests1 ... RunFooTests9
1392  No shard:
1393      FooTests -> RunFooTests
1394
1395  Arg:
1396      name_to_module_info: Dict of name to module info.
1397      path_to_module_info: Dict of path to module info.
1398      info: Dict of module info to check.
1399
1400  Returns:
1401      String of the first-matched associated module that belongs to the
1402      actual robolectric module, None if nothing has been found.
1403  """
1404  if not info:
1405    return ''
1406  module_paths = info.get(constants.MODULE_PATH, [])
1407  if not module_paths:
1408    return ''
1409  filtered_module_names = [
1410      name
1411      for name in _get_module_names(path_to_module_info, module_paths[0])
1412      if name.startswith('Run')
1413  ]
1414  return next(
1415      (
1416          name
1417          for name in filtered_module_names
1418          if ModuleInfo.is_legacy_robolectric_class(
1419              name_to_module_info.get(name)
1420          )
1421      ),
1422      '',
1423  )
1424
1425
1426def _is_legacy_robolectric_test(
1427    name_to_module_info: Dict[str, Dict],
1428    path_to_module_info: Dict[str, Dict],
1429    info: Dict[str, Any],
1430) -> bool:
1431  """Return whether the module_name is a legacy Robolectric test"""
1432  if ModuleInfo.is_tradefed_testable_module(info):
1433    return False
1434  return bool(
1435      _get_robolectric_test_name(name_to_module_info, path_to_module_info, info)
1436  )
1437
1438
1439def get_module_info_target() -> str:
1440  """Get module info target name for soong_ui.bash"""
1441  build_top = atest_utils.get_build_top()
1442  module_info_path = atest_utils.get_product_out(_MODULE_INFO)
1443  if module_info_path.is_relative_to(build_top):
1444    return str(module_info_path.relative_to(build_top))
1445
1446  logging.debug('Found customized OUT_DIR!')
1447  return str(module_info_path)
1448
1449
1450def build():
1451  """Build module-info.json"""
1452  logging.debug(
1453      'Generating %s - this is required for initial runs or forced rebuilds.',
1454      _MODULE_INFO,
1455  )
1456  build_start = time.time()
1457  if not atest_utils.build([get_module_info_target()]):
1458    sys.exit(ExitCode.BUILD_FAILURE)
1459
1460  metrics.LocalDetectEvent(
1461      detect_type=DetectType.ONLY_BUILD_MODULE_INFO,
1462      result=int(time.time() - build_start),
1463  )
1464
1465
1466def _is_testable_module(
1467    name_to_module_info: Dict[str, Dict],
1468    path_to_module_info: Dict[str, Dict],
1469    info: Dict[str, Any],
1470) -> bool:
1471  """Check if module is something we can test.
1472
1473  A module is testable if:
1474    - it's a tradefed testable module, or
1475    - it's a Mobly module, or
1476    - it's a robolectric module (or shares path with one).
1477
1478  Args:
1479      name_to_module_info: Dict of name to module info.
1480      path_to_module_info: Dict of path to module info.
1481      info: Dict of module info to check.
1482
1483  Returns:
1484      True if we can test this module, False otherwise.
1485  """
1486  if not info or not info.get(constants.MODULE_NAME):
1487    return False
1488  if ModuleInfo.is_tradefed_testable_module(info):
1489    return True
1490  if ModuleInfo.is_mobly_module(info):
1491    return True
1492  if _is_legacy_robolectric_test(
1493      name_to_module_info, path_to_module_info, info
1494  ):
1495    return True
1496  return False
1497
1498
1499def _get_testable_modules(
1500    name_to_module_info: Dict[str, Dict],
1501    path_to_module_info: Dict[str, Dict],
1502    suite: str = None,
1503    index_path: Path = None,
1504):
1505  """Return testable modules of the given suite name."""
1506  suite_to_modules = _get_suite_to_modules(
1507      name_to_module_info, path_to_module_info, index_path
1508  )
1509
1510  return _filter_modules_by_suite(suite_to_modules, suite)
1511
1512
1513def _get_suite_to_modules(
1514    name_to_module_info: Dict[str, Dict],
1515    path_to_module_info: Dict[str, Dict],
1516    index_path: Path = None,
1517) -> Dict[str, Set[str]]:
1518  """Map suite and its modules.
1519
1520  Args:
1521      name_to_module_info: Dict of name to module info.
1522      path_to_module_info: Dict of path to module info.
1523      index_path: Path of the stored content.
1524
1525  Returns:
1526      Dict of suite and testable modules mapping.
1527  """
1528  suite_to_modules = {}
1529
1530  for _, info in name_to_module_info.items():
1531    if _is_testable_module(name_to_module_info, path_to_module_info, info):
1532      testable_module = info.get(constants.MODULE_NAME)
1533      suites = (
1534          info.get('compatibility_suites')
1535          if info.get('compatibility_suites')
1536          else ['null-suite']
1537      )
1538
1539      for suite in suites:
1540        suite_to_modules.setdefault(suite, set()).add(testable_module)
1541
1542  if index_path:
1543    _index_testable_modules(suite_to_modules, index_path)
1544
1545  return suite_to_modules
1546
1547
1548def _filter_modules_by_suite(
1549    suite_to_modules: Dict[str, Set[str]],
1550    suite: str = None,
1551) -> Set[str]:
1552  """Return modules of the given suite name."""
1553  if suite:
1554    return suite_to_modules.get(suite)
1555
1556  return {mod for mod_set in suite_to_modules.values() for mod in mod_set}
1557
1558
1559def _index_testable_modules(contents: Any, index_path: Path):
1560  """Dump testable modules.
1561
1562  Args:
1563      content: An object that will be written to the index file.
1564      index_path: Path to the saved index file.
1565  """
1566  logging.debug(
1567      r'Indexing testable modules... '
1568      r'(This is required whenever module-info.json '
1569      r'was rebuilt.)'
1570  )
1571  index_path.parent.mkdir(parents=True, exist_ok=True)
1572  with tempfile.NamedTemporaryFile(delete=False) as cache:
1573    try:
1574      pickle.dump(contents, cache, protocol=2)
1575      shutil.move(cache.name, index_path)
1576      logging.debug('%s is created successfully.', index_path)
1577    except IOError:
1578      atest_utils.print_and_log_error('Failed in dumping %s', cache)
1579      os.remove(cache.name)
1580
1581
1582class SqliteDict(collections.abc.Mapping):
1583  """A class that loads a Sqlite DB as a dictionary-like object.
1584
1585  Args:
1586      conn: A connection to the Sqlite database.
1587      table_name: A string the table name.
1588  """
1589
1590  def __init__(self, conn: sqlite3.Connection, table_name: str):
1591    """Initialize the SqliteDict instance."""
1592    self.conn = conn
1593    self.table = table_name
1594
1595  def __iter__(self) -> str:
1596    """Iterate over the keys in the SqliteDict."""
1597    for key in self._load_key_rows():
1598      yield key[0]
1599
1600  def _load_key_rows(self) -> Set[str]:
1601    """Load the key rows from the database table."""
1602    results = self.conn.execute(f'SELECT key FROM {self.table}').fetchall()
1603    return set(results)
1604
1605  def __len__(self) -> int:
1606    """Get the size of key-value pairs in the SqliteDict."""
1607    return len(self._load_key_rows())
1608
1609  def __getitem__(self, key) -> Dict[str, Any]:
1610    """Get the value associated with the specified key."""
1611    result = self.conn.execute(
1612        f'SELECT value FROM {self.table} WHERE key = ?', (key,)
1613    ).fetchone()
1614    if result:
1615      return json.loads(result[0])
1616    raise KeyError(f'Bad key: {key}')
1617
1618  def items(self) -> Tuple[str, Dict[str, Any]]:
1619    """Iterate over the key-value pairs in the SqliteDict."""
1620    for key in self:
1621      value = self[key]
1622      yield key, value
1623