# Copyright 2018, The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module Info class used to hold cached module-info.json.""" # pylint: disable=too-many-lines from __future__ import annotations import collections import json import logging import os from pathlib import Path import pickle import re import shutil import sqlite3 import sys import tempfile import time from typing import Any, Callable, Dict, List, Set, Tuple from atest import atest_utils from atest import constants from atest.atest_enum import DetectType, ExitCode from atest.metrics import metrics # JSON file generated by build system that lists all buildable targets. _MODULE_INFO = 'module-info.json' # JSON file generated by build system that lists dependencies for java. _JAVA_DEP_INFO = 'module_bp_java_deps.json' # JSON file generated by build system that lists dependencies for cc. _CC_DEP_INFO = 'module_bp_cc_deps.json' # JSON file generated by atest merged the content from module-info, # module_bp_java_deps.json, and module_bp_cc_deps. _MERGED_INFO = 'atest_merged_dep.json' _DB_VERSION = 2 _DB_NAME = f'module-info.{_DB_VERSION}.db' _NAME_MODULE_TABLE = 'modules' _PATH_MODULE_TABLE = 'path_modules' Module = Dict[str, Any] def load_from_file( module_file: Path = None, force_build: bool = False, ) -> ModuleInfo: """Factory method that initializes ModuleInfo from the build-generated JSON file """ loader = Loader( module_file=module_file, force_build=force_build, need_merge_fn=lambda: False, ) mi = loader.load() return mi def load_from_dict(name_to_module_info: Dict[str, Any]) -> ModuleInfo: """Factory method that initializes ModuleInfo from a dictionary.""" path_to_module_info = get_path_to_module_info(name_to_module_info) return ModuleInfo( name_to_module_info=name_to_module_info, path_to_module_info=path_to_module_info, get_testable_modules=lambda s: _get_testable_modules( name_to_module_info, path_to_module_info, s ), ) def create_empty() -> ModuleInfo: """Factory method that initializes an empty ModuleInfo.""" return ModuleInfo() def load( force_build: bool = False, sqlite_module_cache: bool = False ) -> ModuleInfo: """Factory method that initializes ModuleInfo from the build-generated JSON or Sqlite file. """ mod_start = time.time() loader = Loader( force_build=force_build, sqlite_module_cache=sqlite_module_cache ) mod_stop = time.time() - mod_start metrics.LocalDetectEvent( detect_type=DetectType.MODULE_INFO_INIT_MS, result=int(mod_stop * 1000) ) return loader.load(save_timestamps=True) def metrics_timer(func): """Decorator method for sending data to metrics.""" def wrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) elapsed_time = int(time.time() - start) metrics.LocalDetectEvent( detect_type=DetectType.TESTABLE_MODULES, result=elapsed_time ) return result return wrapper class Loader: """Class that handles load and merge processes.""" def __init__( self, module_file: Path = None, force_build: bool = False, sqlite_module_cache: bool = False, need_merge_fn: Callable = None, ): logging.debug( 'Creating module info loader object with module_file: %s, force_build:' ' %s, sqlite_module_cache: %s, need_merge_fn: %s', module_file, force_build, sqlite_module_cache, need_merge_fn, ) self.java_dep_path = atest_utils.get_build_out_dir('soong', _JAVA_DEP_INFO) self.cc_dep_path = atest_utils.get_build_out_dir('soong', _CC_DEP_INFO) self.merged_dep_path = atest_utils.get_product_out(_MERGED_INFO) logging.debug( 'java_dep_path: %s, cc_dep_path: %s, merged_dep_path: %s', self.java_dep_path, self.cc_dep_path, self.merged_dep_path, ) self.sqlite_module_cache = sqlite_module_cache logging.debug('sqlite_module_cache: %s', sqlite_module_cache) if self.sqlite_module_cache: self.cache_file = atest_utils.get_product_out(_DB_NAME) self.save_cache_async = self._save_db_async self.load_from_cache = self._load_from_db else: self.cache_file = self.merged_dep_path self.save_cache_async = self._save_json_async self.load_from_cache = self._load_from_json if need_merge_fn: self.save_cache_async = lambda _, __: None self.update_merge_info = False self.module_index = atest_utils.get_index_path( f'suite-modules.{_DB_VERSION}.idx' ) self.module_index_proc = None logging.debug('module_index: %s', self.module_index) if module_file: self.mod_info_file_path = Path(module_file) self.load_module_info = self._load_module_info_from_file_wo_merging else: self.mod_info_file_path = atest_utils.get_product_out(_MODULE_INFO) if force_build: logging.debug('Triggering module info build by force build.') build() elif not self.mod_info_file_path.is_file(): logging.debug( 'Triggering module info build due to module info file path %s not' ' exist.', self.mod_info_file_path, ) build() self.update_merge_info = self.need_merge_module_info() self.load_module_info = self._load_module_info_file logging.debug( 'Executing load_module_info function %s', self.load_module_info ) self.name_to_module_info, self.path_to_module_info = self.load_module_info() logging.debug('Completed creating module info loader object') def load(self, save_timestamps: bool = False): logging.debug('Loading ModuleInfo. save_timestamps: %s', save_timestamps) if save_timestamps: atest_utils.run_multi_proc(func=atest_utils.save_build_files_timestamp) return ModuleInfo( name_to_module_info=self.name_to_module_info, path_to_module_info=self.path_to_module_info, mod_info_file_path=self.mod_info_file_path, get_testable_modules=self.get_testable_modules, ) def _load_module_info_file(self): """Load module-info.json file as ModuleInfo and merge related JSON files whenever required. Returns: Dict of module name to module info and dict of module path to module info. """ # +--------------+ +----------------------------------+ # | ModuleInfo() | | ModuleInfo(module_file=foo.json) | # +-------+------+ +----------------+-----------------+ # | module_info.build() | load # v V # +--------------------------+ +--------------------------+ # | module-info.json | | foo.json | # | module_bp_cc_deps.json | | module_bp_cc_deps.json | # | module_bp_java_deps.json | | module_bp_java_deps.json | # +--------------------------+ +--------------------------+ # | | # | _merge_soong_info() <--------------------+ # v # +============================+ # | $ANDROID_PRODUCT_OUT | # | /atest_merged_dep.json |--> load as module info. # +============================+ if not self.update_merge_info: return self.load_from_cache() name_modules, path_modules = self._load_from_json(merge=True) self.save_cache_async(name_modules, path_modules) self._save_testable_modules_async(name_modules, path_modules) return name_modules, path_modules def _load_module_info_from_file_wo_merging(self): """Load module-info.json as ModuleInfo without merging.""" name_modules = atest_utils.load_json_safely(self.mod_info_file_path) _add_missing_variant_modules(name_modules) return name_modules, get_path_to_module_info(name_modules) def _save_db_async( self, name_to_module_info: Dict[str, Any], path_to_module_info: Dict[str, Any], ): """Save data to a Sqlite database in parallel.""" data_map = { _NAME_MODULE_TABLE: name_to_module_info, _PATH_MODULE_TABLE: path_to_module_info, } _save_data_async( function=_create_db, contents=data_map, target_path=self.cache_file, ) def _load_from_db(self) -> Tuple[Dict[str, Any], Dict[str, Any]]: """Return a tuple of dicts by from SqliteDict.""" conn = sqlite3.connect(self.cache_file) with conn: name_to_module_info = SqliteDict(conn, _NAME_MODULE_TABLE) path_to_module_info = SqliteDict(conn, _PATH_MODULE_TABLE) return name_to_module_info, path_to_module_info def _save_json_async(self, name_to_module_info: Dict[str, Any], _): """Save data to a JSON format in parallel.""" _save_data_async( function=_create_json, contents=name_to_module_info, target_path=self.cache_file, ) def _load_from_json(self, merge: bool = False) -> Tuple[Dict, Dict]: """Load or merge module info from json file. Args: merge: Boolean whether to merge build system infos. Returns: A tuple of (name_to_module_info, path_to_module_info). """ start = time.time() if merge: name_info = self._merge_build_system_infos( atest_utils.load_json_safely(self.mod_info_file_path) ) duration = time.time() - start logging.debug('Merging module info took %ss', duration) metrics.LocalDetectEvent( detect_type=DetectType.MODULE_MERGE_MS, result=int(duration * 1000) ) return name_info, get_path_to_module_info(name_info) name_info = atest_utils.load_json_safely(self.merged_dep_path) duration = time.time() - start logging.debug('Loading module info took %ss', duration) metrics.LocalDetectEvent( detect_type=DetectType.MODULE_LOAD_MS, result=int(duration * 1000) ) logging.debug('Loading %s as module-info.', self.merged_dep_path) return name_info, get_path_to_module_info(name_info) def _save_testable_modules_async( self, name_to_module_info: Dict[str, Any], path_to_module_info: Dict[str, Any], ): """Save testable modules in parallel.""" return atest_utils.run_multi_proc( func=_get_testable_modules, kwargs={ 'name_to_module_info': name_to_module_info, 'path_to_module_info': path_to_module_info, 'index_path': self.module_index, }, ) def need_merge_module_info(self): """Check if needed to regenerate the cache file. If the cache file is non-existent or testable module index is inexistent or older than any of the JSON files used to generate it, the cache file must re-generate. Returns: True when the cache file is older or non-existent, False otherwise. """ if not self.cache_file.is_file(): return True if not self.module_index.is_file(): return True # The dependency input files should be generated at this point. return any( self.cache_file.stat().st_mtime < f.stat().st_mtime for f in (self.mod_info_file_path, self.java_dep_path, self.cc_dep_path) ) def _merge_build_system_infos( self, name_to_module_info, java_bp_info_path=None, cc_bp_info_path=None ): """Merge the content of module-info.json and CC/Java dependency files to name_to_module_info. Args: name_to_module_info: Dict of module name to module info dict. java_bp_info_path: String of path to java dep file to load up. Used for testing. cc_bp_info_path: String of path to cc dep file to load up. Used for testing. Returns: Dict of updated name_to_module_info. """ # Merge _JAVA_DEP_INFO if not java_bp_info_path: java_bp_info_path = self.java_dep_path java_bp_infos = atest_utils.load_json_safely(java_bp_info_path) if java_bp_infos: logging.debug('Merging Java build info: %s', java_bp_info_path) name_to_module_info = merge_soong_info(name_to_module_info, java_bp_infos) # Merge _CC_DEP_INFO if not cc_bp_info_path: cc_bp_info_path = self.cc_dep_path cc_bp_infos = atest_utils.load_json_safely(cc_bp_info_path) if cc_bp_infos: logging.debug('Merging CC build info: %s', cc_bp_info_path) # CC's dep json format is different with java. # Below is the example content: # { # "clang": "${ANDROID_ROOT}/bin/clang", # "clang++": "${ANDROID_ROOT}/bin/clang++", # "modules": { # "ACameraNdkVendorTest": { # "path": [ # "frameworks/av/camera/ndk" # ], # "srcs": [ # "frameworks/tests/AImageVendorTest.cpp", # "frameworks/tests/ACameraManagerTest.cpp" # ], name_to_module_info = merge_soong_info( name_to_module_info, cc_bp_infos.get('modules', {}) ) # If $ANDROID_PRODUCT_OUT was not created in pyfakefs, simply return it # without dumping atest_merged_dep.json in real. # Adds the key into module info as a unique ID. for key, info in name_to_module_info.items(): info[constants.MODULE_INFO_ID] = key _add_missing_variant_modules(name_to_module_info) return name_to_module_info @metrics_timer def get_testable_modules(self, suite=None): """Return the testable modules of the given suite name. Atest does not index testable modules against compatibility_suites. When suite was given, or the index file was interrupted, always run _get_testable_modules() and re-index. Args: suite: A string of suite name. Returns: If suite is not given, return all the testable modules in module info, otherwise return only modules that belong to the suite. """ modules = set() if self.module_index.is_file(): modules = self.get_testable_modules_from_index(suite) # If the modules.idx does not exist or invalid for any reason, generate # a new one arbitrarily. if not modules: modules = self.get_testable_module_from_memory(suite) return modules def get_testable_modules_from_index(self, suite: str = None) -> Set[str]: """Return the testable modules of the given suite name.""" suite_to_modules = {} with open(self.module_index, 'rb') as cache: try: suite_to_modules = pickle.load(cache, encoding='utf-8') except UnicodeDecodeError: suite_to_modules = pickle.load(cache) # when module indexing was interrupted. except EOFError: pass return _filter_modules_by_suite(suite_to_modules, suite) def get_testable_module_from_memory(self, suite: str = None) -> Set[str]: """Return the testable modules of the given suite name.""" return _get_testable_modules( name_to_module_info=self.name_to_module_info, path_to_module_info=self.path_to_module_info, index_path=self.module_index, suite=suite, ) class ModuleInfo: """Class that offers fast/easy lookup for Module related details.""" def __init__( self, name_to_module_info: Dict[str, Any] = None, path_to_module_info: Dict[str, Any] = None, mod_info_file_path: Path = None, get_testable_modules: Callable = None, ): """Initialize the ModuleInfo object. Load up the module-info.json file and initialize the helper vars. Note that module-info.json does not contain all module dependencies, therefore, Atest needs to accumulate dependencies defined in bp files. Args: name_to_module_info: Dict of name to module info. path_to_module_info: Dict of path to module info. mod_info_file_path: Path of module-info.json. get_testable_modules: Function to get all testable modules. """ # +----------------------+ +----------------------------+ # | $ANDROID_PRODUCT_OUT | |$ANDROID_BUILD_TOP/out/soong| # | /module-info.json | | /module_bp_java_deps.json | # +-----------+----------+ +-------------+--------------+ # | _merge_soong_info() | # +------------------------------+ # | # v # +----------------------------+ +----------------------------+ # |tempfile.NamedTemporaryFile | |$ANDROID_BUILD_TOP/out/soong| # +-------------+--------------+ | /module_bp_cc_deps.json | # | +-------------+--------------+ # | _merge_soong_info() | # +-------------------------------+ # | # +-------| # v # +============================+ # | $ANDROID_PRODUCT_OUT | # | /atest_merged_dep.json |--> load as module info. # +============================+ self.root_dir = os.environ.get(constants.ANDROID_BUILD_TOP) self.name_to_module_info = name_to_module_info or {} self.path_to_module_info = path_to_module_info or {} self.mod_info_file_path = mod_info_file_path self._get_testable_modules = get_testable_modules def is_module(self, name): """Return True if name is a module, False otherwise.""" info = self.get_module_info(name) # From aosp/2293302 it started merging all modules' dependency in bp # even the module is not be exposed to make, and those modules could not # be treated as a build target using m. Only treat input name as module # if it also has the module_name attribute which means it could be a # build target for m. if info and info.get(constants.MODULE_NAME): return True return False def get_paths(self, name) -> list[str]: """Return paths of supplied module name, Empty list if non-existent.""" info = self.get_module_info(name) if info: return info.get(constants.MODULE_PATH, []) return [] def get_module_names(self, rel_module_path): """Get the modules that all have module_path. Args: rel_module_path: path of module in module-info.json Returns: List of module names. """ return _get_module_names(self.path_to_module_info, rel_module_path) def get_module_info(self, mod_name): """Return dict of info for given module name, None if non-existence.""" return self.name_to_module_info.get(mod_name) @staticmethod def is_suite_in_compatibility_suites(suite, mod_info): """Check if suite exists in the compatibility_suites of module-info. Args: suite: A string of suite name. mod_info: Dict of module info to check. Returns: True if it exists in mod_info, False otherwise. """ if not isinstance(mod_info, dict): return False return suite in mod_info.get(constants.MODULE_COMPATIBILITY_SUITES, []) def get_testable_modules(self, suite=None): return self._get_testable_modules(suite) @staticmethod def is_tradefed_testable_module(info: Dict[str, Any]) -> bool: """Check whether the module is a Tradefed executable test.""" if not info: return False if not info.get(constants.MODULE_INSTALLED, []): return False return ModuleInfo.has_test_config(info) @staticmethod def is_mobly_module(info: Dict[str, Any]) -> bool: """Check whether the module is a Mobly test. Note: Only python_test_host modules marked with a test_options tag of "mobly" is considered a Mobly module. Args: info: Dict of module info to check. Returns: True if this is a Mobly test module, False otherwise. """ return constants.MOBLY_TEST_OPTIONS_TAG in info.get( constants.MODULE_TEST_OPTIONS_TAGS, [] ) def is_testable_module(self, info: Dict[str, Any]) -> bool: """Check if module is something we can test. A module is testable if: - it's a tradefed testable module, or - it's a Mobly module, or - it's a robolectric module (or shares path with one). Args: info: Dict of module info to check. Returns: True if we can test this module, False otherwise. """ return _is_testable_module( self.name_to_module_info, self.path_to_module_info, info ) @staticmethod def has_test_config(info: Dict[str, Any]) -> bool: """Validate if this module has a test config. A module can have a test config in the following manner: - test_config be set in module-info.json. - Auto-generated config via the auto_test_config key in module-info.json. Args: info: Dict of module info to check. Returns: True if this module has a test config, False otherwise. """ return bool( info.get(constants.MODULE_TEST_CONFIG, []) or info.get('auto_test_config', []) ) def is_legacy_robolectric_test(self, info: Dict[str, Any]) -> bool: """Return whether the module_name is a legacy Robolectric test""" return _is_legacy_robolectric_test( self.name_to_module_info, self.path_to_module_info, info ) def get_robolectric_test_name(self, info: Dict[str, Any]) -> str: """Returns runnable robolectric module name. This method is for legacy robolectric tests and returns one of associated modules. The pattern is determined by the amount of shards: 10 shards: FooTests -> RunFooTests0, RunFooTests1 ... RunFooTests9 No shard: FooTests -> RunFooTests Arg: info: Dict of module info to check. Returns: String of the first-matched associated module that belongs to the actual robolectric module, None if nothing has been found. """ return _get_robolectric_test_name( self.name_to_module_info, self.path_to_module_info, info ) def is_robolectric_test(self, module_name): """Check if the given module is a robolectric test. Args: module_name: String of module to check. Returns: Boolean whether it's a robotest or not. """ if self.get_robolectric_type(module_name): return True return False def get_robolectric_type(self, module_name: str) -> int: """Check if the given module is a robolectric test and return type of it. Robolectric declaration is converting from Android.mk to Android.bp, and in the interim Atest needs to support testing both types of tests. The modern robolectric tests defined by 'android_robolectric_test' in an Android.bp file can can be run in Tradefed Test Runner: SettingsRoboTests -> Tradefed Test Runner Legacy tests defined in an Android.mk can only run with the 'make' way. SettingsRoboTests -> make RunSettingsRoboTests0 To determine whether the test is a modern/legacy robolectric test: 1. If the 'robolectric-test` in the compatibility_suites, it's a modern one, otherwise it's a legacy test. This is accurate since aosp/2308586 already set the test suite of `robolectric-test` for all `modern` Robolectric tests in Soong. 2. Traverse all modules share the module path. If one of the modules has a ROBOLECTRIC class, it's a legacy robolectric test. Args: module_name: String of module to check. Returns: 0: not a robolectric test. 1: a modern robolectric test(defined in Android.bp) 2: a legacy robolectric test(defined in Android.mk) """ info = self.get_module_info(module_name) if not info: return 0 # Some Modern mode Robolectric test has related module which compliant # with the Legacy Robolectric test. In this case, the Modern mode # Robolectric tests should be prior to the Legacy mode. if self.is_modern_robolectric_test(info): return constants.ROBOTYPE_MODERN if self.is_legacy_robolectric_test(info): return constants.ROBOTYPE_LEGACY return 0 def get_instrumentation_target_apps(self, module_name: str) -> Dict: """Return target APKs of an instrumentation test. Returns: A dict of target module and target APK(s). e.g. {"FooService": {"/path/to/the/FooService.apk"}} """ # 1. Determine the actual manifest filename from an Android.bp(if any) manifest = self.get_filepath_from_module(module_name, 'AndroidManifest.xml') bpfile = self.get_filepath_from_module(module_name, 'Android.bp') if bpfile.is_file(): bp_info = atest_utils.get_bp_content(bpfile, 'android_test') if not bp_info or not bp_info.get(module_name): return {} manifest = self.get_filepath_from_module( module_name, bp_info.get(module_name).get('manifest') ) xml_info = atest_utils.get_manifest_info(manifest) # 2. Translate package name to a module name. package = xml_info.get('package') target_package = xml_info.get('target_package') # Ensure it's an instrumentation test(excluding self-instrmented) if target_package and package != target_package: logging.debug('Found %s an instrumentation test.', module_name) metrics.LocalDetectEvent( detect_type=DetectType.FOUND_INSTRUMENTATION_TEST, result=1 ) target_module = self.get_target_module_by_pkg( package=target_package, search_from=manifest.parent ) if target_module: return self.get_artifact_map(target_module) return {} # pylint: disable=anomalous-backslash-in-string def get_target_module_by_pkg(self, package: str, search_from: Path) -> str: """Translate package name to the target module name. This method is dedicated to determine the target module by translating a package name. Phase 1: Find out possible manifest files among parent directories. Phase 2. Look for the defined package fits the given name, and ensure it is not a persistent app. Phase 3: Translate the manifest path to possible modules. A valid module must fulfill: 1. The 'class' type must be ['APPS']. 2. It is not a Robolectric test. Returns: A string of module name. """ xmls = [] for pth in search_from.parents: if pth == Path(self.root_dir): break for name in os.listdir(pth): if pth.joinpath(name).is_file(): match = re.match('.*AndroidManifest.*\.xml$', name) if match: xmls.append(os.path.join(pth, name)) possible_modules = [] for xml in xmls: rel_dir = str(Path(xml).relative_to(self.root_dir).parent) logging.debug('Looking for package "%s" in %s...', package, xml) xml_info = atest_utils.get_manifest_info(xml) if xml_info.get('package') == package: if xml_info.get('persistent'): logging.debug('%s is a persistent app.', package) continue for _m in self.path_to_module_info.get(rel_dir): possible_modules.append(_m) if possible_modules: for mod in possible_modules: name = mod.get('module_name') if mod.get('class') == ['APPS'] and not self.is_robolectric_test(name): return name return '' def get_artifact_map(self, module_name: str) -> Dict: """Get the installed APK path of the given module.""" target_mod_info = self.get_module_info(module_name) artifact_map = {} if target_mod_info: apks = set() artifacts = target_mod_info.get('installed') for artifact in artifacts: if Path(artifact).suffix == '.apk': apks.add(os.path.join(self.root_dir, artifact)) artifact_map.update({module_name: apks}) return artifact_map def is_auto_gen_test_config(self, module_name): """Check if the test config file will be generated automatically. Args: module_name: A string of the module name. Returns: True if the test config file will be generated automatically. """ if self.is_module(module_name): mod_info = self.get_module_info(module_name) auto_test_config = mod_info.get('auto_test_config', []) return auto_test_config and auto_test_config[0] return False @staticmethod def is_legacy_robolectric_class(info: Dict[str, Any]) -> bool: """Check if the class is `ROBOLECTRIC` This method is for legacy robolectric tests that the associated modules contain: 'class': ['ROBOLECTRIC'] Args: info: ModuleInfo to check. Returns: True if the attribute class in mod_info is ROBOLECTRIC, False otherwise. """ if info: module_classes = info.get(constants.MODULE_CLASS, []) return ( module_classes and module_classes[0] == constants.MODULE_CLASS_ROBOLECTRIC ) return False def is_native_test(self, module_name): """Check if the input module is a native test. Args: module_name: A string of the module name. Returns: True if the test is a native test, False otherwise. """ mod_info = self.get_module_info(module_name) return constants.MODULE_CLASS_NATIVE_TESTS in mod_info.get( constants.MODULE_CLASS, [] ) def has_mainline_modules( self, module_name: str, mainline_binaries: List[str] ) -> bool: """Check if the mainline modules are in module-info. Args: module_name: A string of the module name. mainline_binaries: A list of mainline module binaries. Returns: True if mainline_binaries is in module-info, False otherwise. """ mod_info = self.get_module_info(module_name) # Check 'test_mainline_modules' attribute of the module-info.json. mm_in_mf = mod_info.get(constants.MODULE_MAINLINE_MODULES, []) ml_modules_set = set(mainline_binaries) if mm_in_mf: return contains_same_mainline_modules(ml_modules_set, set(mm_in_mf)) for test_config in mod_info.get(constants.MODULE_TEST_CONFIG, []): # Check the value of 'mainline-param' in the test config. if not self.is_auto_gen_test_config(module_name): return contains_same_mainline_modules( ml_modules_set, atest_utils.get_mainline_param( os.path.join(self.root_dir, test_config) ), ) # Unable to verify mainline modules in an auto-gen test config. logging.debug( '%s is associated with an auto-generated test config.', module_name ) return True return False def get_filepath_from_module(self, module_name: str, filename: str) -> Path: """Return absolute path of the given module and filename.""" mod_path = self.get_paths(module_name) if mod_path: return Path(self.root_dir).joinpath(mod_path[0], filename) return Path() def get_module_dependency(self, module_name, depend_on=None): """Get the dependency sets for input module. Recursively find all the dependencies of the input module. Args: module_name: String of module to check. depend_on: The list of parent dependencies. Returns: Set of dependency modules. """ if not depend_on: depend_on = set() deps = set() mod_info = self.get_module_info(module_name) if not mod_info: return deps mod_deps = set(mod_info.get(constants.MODULE_DEPENDENCIES, [])) # Remove item in deps if it already in depend_on: mod_deps = mod_deps - depend_on deps = deps.union(mod_deps) for mod_dep in mod_deps: deps = deps.union( set( self.get_module_dependency( mod_dep, depend_on=depend_on.union(deps) ) ) ) return deps def get_install_module_dependency(self, module_name, depend_on=None): """Get the dependency set for the given modules with installed path. Args: module_name: String of module to check. depend_on: The list of parent dependencies. Returns: Set of dependency modules which has installed path. """ install_deps = set() deps = self.get_module_dependency(module_name, depend_on) logging.debug('%s depends on: %s', module_name, deps) for module in deps: mod_info = self.get_module_info(module) if mod_info and mod_info.get(constants.MODULE_INSTALLED, []): install_deps.add(module) logging.debug( 'modules %s required by %s were not installed', install_deps, module_name, ) return install_deps @staticmethod def is_unit_test(mod_info): """Return True if input module is unit test, False otherwise. Args: mod_info: ModuleInfo to check. Returns: True if input module is unit test, False otherwise. """ return mod_info.get(constants.MODULE_IS_UNIT_TEST, '') == 'true' def is_host_unit_test(self, info: Dict[str, Any]) -> bool: """Return True if input module is host unit test, False otherwise. Args: info: ModuleInfo to check. Returns: True if input module is host unit test, False otherwise. """ return self.is_tradefed_testable_module( info ) and self.is_suite_in_compatibility_suites('host-unit-tests', info) def is_modern_robolectric_test(self, info: Dict[str, Any]) -> bool: """Return whether 'robolectric-tests' is in 'compatibility_suites'.""" return self.is_tradefed_testable_module( info ) and self.is_robolectric_test_suite(info) def is_robolectric_test_suite(self, mod_info) -> bool: """Return True if 'robolectric-tests' in the compatibility_suites. Args: mod_info: ModuleInfo to check. Returns: True if the 'robolectric-tests' is in the compatibility_suites, False otherwise. """ return self.is_suite_in_compatibility_suites('robolectric-tests', mod_info) def is_ravenwood_test(self, info: Dict[str, Any]) -> bool: """Return whether 'ravenwood-tests' is in 'compatibility_suites'.""" return self.is_tradefed_testable_module( info ) and self.is_ravenwood_test_suite(info) def is_ravenwood_test_suite(self, mod_info) -> bool: """Return True if 'ravenwood-tests' in the compatibility_suites. Args: mod_info: ModuleInfo to check. Returns: True if the 'ravenwood-tests' is in the compatibility_suites, False otherwise. """ return self.is_suite_in_compatibility_suites('ravenwood-tests', mod_info) def is_device_driven_test(self, mod_info): """Return True if input module is device driven test, False otherwise. Args: mod_info: ModuleInfo to check. Returns: True if input module is device driven test, False otherwise. """ if self.is_robolectric_test_suite(mod_info): return False if self.is_ravenwood_test_suite(mod_info): return False return self.is_tradefed_testable_module( mod_info ) and 'DEVICE' in mod_info.get(constants.MODULE_SUPPORTED_VARIANTS, []) def is_host_driven_test(self, mod_info): """Return True if input module is host driven test, False otherwise. Args: mod_info: ModuleInfo to check. Returns: True if input module is host driven test, False otherwise. """ return self.is_tradefed_testable_module( mod_info ) and 'HOST' in mod_info.get(constants.MODULE_SUPPORTED_VARIANTS, []) def _any_module(self, _: Module) -> bool: return True def get_all_tests(self): """Get a list of all the module names which are tests.""" return self._get_all_modules(type_predicate=self.is_testable_module) def get_all_unit_tests(self): """Get a list of all the module names which are unit tests.""" return self._get_all_modules(type_predicate=ModuleInfo.is_unit_test) def get_all_host_unit_tests(self): """Get a list of all the module names which are host unit tests.""" return self._get_all_modules(type_predicate=self.is_host_unit_test) def get_all_device_driven_tests(self): """Get a list of all the module names which are device driven tests.""" return self._get_all_modules(type_predicate=self.is_device_driven_test) def _get_all_modules(self, type_predicate=None): """Get a list of all the module names that passed the predicate.""" modules = [] type_predicate = type_predicate or self._any_module for mod_name, mod_info in self.name_to_module_info.items(): if mod_info.get(constants.MODULE_NAME, '') == mod_name: if type_predicate(mod_info): modules.append(mod_name) return modules def get_modules_by_path_in_srcs( self, path: str, testable_modules_only: bool = False ) -> Set[str]: """Get the module name that the given path belongs to.(in 'srcs') Args: path: file path which is relative to ANDROID_BUILD_TOP. testable_modules_only: boolean flag which determines whether search testable modules only or not. Returns: A set of string for matched module names, empty set if nothing find. """ modules = set() for mod_name in ( self.get_testable_modules() if testable_modules_only else self.name_to_module_info.keys() ): m_info = self.get_module_info(mod_name) if m_info: for src in m_info.get(constants.MODULE_SRCS, []): if src in path: modules.add(mod_name) return modules def get_modules_by_path( self, path: str, testable_modules_only: bool = False ) -> set[str]: """Get the module names that the give path belongs to. Args: path: dir path for searching among `path` in module information. testable_modules_only: boolean flag which determines whether search testable modules only or not. Returns: A set of module names. """ modules = set() is_testable_module_fn = ( self.is_testable_module if testable_modules_only else lambda _: True ) m_infos = self.path_to_module_info.get(path) if m_infos: modules = { info.get(constants.MODULE_NAME) for info in m_infos if is_testable_module_fn(info) } return modules def get_modules_by_include_deps( self, deps: Set[str], testable_module_only: bool = False ) -> Set[str]: """Get the matched module names for the input dependencies. Args: deps: A set of string for dependencies. testable_module_only: Option if only want to get testable module. Returns: A set of matched module names for the input dependencies. """ modules = set() for mod_name in ( self.get_testable_modules() if testable_module_only else self.name_to_module_info.keys() ): mod_info = self.get_module_info(mod_name) if mod_info and deps.intersection( set(mod_info.get(constants.MODULE_DEPENDENCIES, [])) ): modules.add(mod_info.get(constants.MODULE_NAME)) return modules def get_installed_paths(self, module_name: str) -> List[Path]: """Return installed path from module info.""" mod_info = self.get_module_info(module_name) if not mod_info: return [] def _to_abs_path(p): if os.path.isabs(p): return Path(p) return Path(os.getenv(constants.ANDROID_BUILD_TOP), p) return [_to_abs_path(p) for p in mod_info.get('installed', [])] def get_code_under_test(self, module_name: str) -> List[str]: """Return code under test from module info.""" mod_info = self.get_module_info(module_name) if not mod_info: atest_utils.colorful_print( '\nmodule %s cannot be found in module info, skip generating' ' coverage for it.' % module_name, constants.YELLOW, ) return [] return mod_info.get('code_under_test', []) def build_variants(self, info: Dict[str, Any]) -> List[str]: return info.get(constants.MODULE_SUPPORTED_VARIANTS, []) def requires_device(self, info: Dict[str, Any]) -> bool: if self.is_modern_robolectric_test(info): return False if self.is_ravenwood_test(info): return False if self.is_host_unit_test(info) and 'DEVICE' not in self.build_variants( info ): return False return True def _create_db(data_map: Dict[str, Dict[str, Any]], db_path: Path): """Create a Sqlite DB by writing to tempfile and move it to the right place. Args: data_map: A dict where the key is table name and value is data itself. db_path: A Path pointing to the DB file. """ if db_path.is_file(): db_path.unlink() with tempfile.NamedTemporaryFile(delete=False) as tmp_db: _create_db_in_path(data_map, tmp_db.name) shutil.move(tmp_db.name, db_path) logging.debug('%s is created successfully.', db_path) def _create_db_in_path(data_map: Dict[str, Dict[str, Any]], db_path: Path): """Create a Sqlite DB with multiple tables. Args: data_map: A dict where the key is table name and value is data itself. db_path: A Path pointing to the DB file. """ con = sqlite3.connect(db_path) with con: cur = con.cursor() for table, contents in data_map.items(): cur.execute(f'CREATE TABLE {table}(key TEXT PRIMARY KEY, value TEXT)') data = [] for k, v in contents.items(): data.append({'key': k, 'value': json.dumps(v)}) cur.executemany(f'INSERT INTO {table} VALUES(:key, :value)', data) def _create_json(data_map: Dict[str, Any], json_path: Path): """Write content onto a JSON file. Args: data_map: A dict where the key is table name and value is data itself. json_path: A Path pointing to the JSON file. """ if json_path.is_file(): json_path.unlink() with tempfile.NamedTemporaryFile(delete=False) as temp_json: with open(temp_json.name, 'w', encoding='utf-8') as _temp: json.dump(data_map, _temp, indent=0) shutil.move(temp_json.name, json_path) logging.debug('%s is created successfully.', json_path) def _save_data_async(function: Callable, contents: Any, target_path: Path): """Save contents to a static file in asynchronized manner.""" atest_utils.run_multi_proc( func=function, args=[contents, target_path], # We set `daemon` to `False` to make sure that Atest doesn't exit before # writing the cache file. daemon=False, ) def merge_soong_info(name_to_module_info, mod_bp_infos): """Merge the dependency and srcs in mod_bp_infos to name_to_module_info. Args: name_to_module_info: Dict of module name to module info dict. mod_bp_infos: Dict of module name to bp's module info dict. Returns: Dict of updated name_to_module_info. """ merge_items = [ constants.MODULE_DEPENDENCIES, constants.MODULE_SRCS, constants.MODULE_LIBS, constants.MODULE_STATIC_LIBS, constants.MODULE_STATIC_DEPS, constants.MODULE_PATH, ] for module_name, dep_info in mod_bp_infos.items(): mod_info = name_to_module_info.setdefault(module_name, {}) for merge_item in merge_items: dep_info_values = dep_info.get(merge_item, []) mod_info_values = mod_info.get(merge_item, []) mod_info_values.extend(dep_info_values) mod_info_values.sort() # deduplicate values just in case. mod_info_values = list(dict.fromkeys(mod_info_values)) name_to_module_info[module_name][merge_item] = mod_info_values return name_to_module_info def _add_missing_variant_modules(name_to_module_info: Dict[str, Module]): missing_modules = {} # Android's build system automatically adds a suffix for some build module # variants. For example, a module-info entry for a module originally named # 'HelloWorldTest' might appear as 'HelloWorldTest_32' and which Atest would # not be able to find. We add such entries if not already present so they # can be looked up using their declared module name. for mod_name, mod_info in name_to_module_info.items(): declared_module_name = mod_info.get(constants.MODULE_NAME, mod_name) if declared_module_name in name_to_module_info: continue missing_modules.setdefault(declared_module_name, mod_info) name_to_module_info.update(missing_modules) def contains_same_mainline_modules( mainline_modules: Set[str], module_lists: Set[str] ): """Check if mainline modules listed on command line is the same set as config. Args: mainline_modules: A list of mainline modules from triggered test. module_lists: A list of concatenate mainline module string from test configs. Returns True if the set mainline modules from triggered test is in the test configs. """ for module_string in module_lists: if mainline_modules == set(module_string.split('+')): return True return False def get_path_to_module_info(name_to_module_info): """Return the path_to_module_info dict. Args: name_to_module_info: Dict of module name to module info dict. Returns: Dict of module path to module info dict. """ path_to_module_info = {} for mod_name, mod_info in name_to_module_info.items(): # Cross-compiled and multi-arch modules actually all belong to # a single target so filter out these extra modules. if mod_name != mod_info.get(constants.MODULE_NAME, ''): continue for path in mod_info.get(constants.MODULE_PATH, []): mod_info[constants.MODULE_NAME] = mod_name # There could be multiple modules in a path. if path in path_to_module_info: path_to_module_info[path].append(mod_info) else: path_to_module_info[path] = [mod_info] return path_to_module_info def _get_module_names(path_to_module_info, rel_module_path): """Get the modules that all have module_path. Args: path_to_module_info: Dict of path to module info. rel_module_path: path of module in module-info.json. Returns: List of module names. """ return [ m.get(constants.MODULE_NAME) for m in path_to_module_info.get(rel_module_path, []) ] def _get_robolectric_test_name( name_to_module_info: Dict[str, Dict], path_to_module_info: Dict[str, Dict], info: Dict[str, Any], ) -> str: """Returns runnable robolectric module name. This method is for legacy robolectric tests and returns one of associated modules. The pattern is determined by the amount of shards: 10 shards: FooTests -> RunFooTests0, RunFooTests1 ... RunFooTests9 No shard: FooTests -> RunFooTests Arg: name_to_module_info: Dict of name to module info. path_to_module_info: Dict of path to module info. info: Dict of module info to check. Returns: String of the first-matched associated module that belongs to the actual robolectric module, None if nothing has been found. """ if not info: return '' module_paths = info.get(constants.MODULE_PATH, []) if not module_paths: return '' filtered_module_names = [ name for name in _get_module_names(path_to_module_info, module_paths[0]) if name.startswith('Run') ] return next( ( name for name in filtered_module_names if ModuleInfo.is_legacy_robolectric_class( name_to_module_info.get(name) ) ), '', ) def _is_legacy_robolectric_test( name_to_module_info: Dict[str, Dict], path_to_module_info: Dict[str, Dict], info: Dict[str, Any], ) -> bool: """Return whether the module_name is a legacy Robolectric test""" if ModuleInfo.is_tradefed_testable_module(info): return False return bool( _get_robolectric_test_name(name_to_module_info, path_to_module_info, info) ) def get_module_info_target() -> str: """Get module info target name for soong_ui.bash""" build_top = atest_utils.get_build_top() module_info_path = atest_utils.get_product_out(_MODULE_INFO) if module_info_path.is_relative_to(build_top): return str(module_info_path.relative_to(build_top)) logging.debug('Found customized OUT_DIR!') return str(module_info_path) def build(): """Build module-info.json""" logging.debug( 'Generating %s - this is required for initial runs or forced rebuilds.', _MODULE_INFO, ) build_start = time.time() if not atest_utils.build([get_module_info_target()]): sys.exit(ExitCode.BUILD_FAILURE) metrics.LocalDetectEvent( detect_type=DetectType.ONLY_BUILD_MODULE_INFO, result=int(time.time() - build_start), ) def _is_testable_module( name_to_module_info: Dict[str, Dict], path_to_module_info: Dict[str, Dict], info: Dict[str, Any], ) -> bool: """Check if module is something we can test. A module is testable if: - it's a tradefed testable module, or - it's a Mobly module, or - it's a robolectric module (or shares path with one). Args: name_to_module_info: Dict of name to module info. path_to_module_info: Dict of path to module info. info: Dict of module info to check. Returns: True if we can test this module, False otherwise. """ if not info or not info.get(constants.MODULE_NAME): return False if ModuleInfo.is_tradefed_testable_module(info): return True if ModuleInfo.is_mobly_module(info): return True if _is_legacy_robolectric_test( name_to_module_info, path_to_module_info, info ): return True return False def _get_testable_modules( name_to_module_info: Dict[str, Dict], path_to_module_info: Dict[str, Dict], suite: str = None, index_path: Path = None, ): """Return testable modules of the given suite name.""" suite_to_modules = _get_suite_to_modules( name_to_module_info, path_to_module_info, index_path ) return _filter_modules_by_suite(suite_to_modules, suite) def _get_suite_to_modules( name_to_module_info: Dict[str, Dict], path_to_module_info: Dict[str, Dict], index_path: Path = None, ) -> Dict[str, Set[str]]: """Map suite and its modules. Args: name_to_module_info: Dict of name to module info. path_to_module_info: Dict of path to module info. index_path: Path of the stored content. Returns: Dict of suite and testable modules mapping. """ suite_to_modules = {} for _, info in name_to_module_info.items(): if _is_testable_module(name_to_module_info, path_to_module_info, info): testable_module = info.get(constants.MODULE_NAME) suites = ( info.get('compatibility_suites') if info.get('compatibility_suites') else ['null-suite'] ) for suite in suites: suite_to_modules.setdefault(suite, set()).add(testable_module) if index_path: _index_testable_modules(suite_to_modules, index_path) return suite_to_modules def _filter_modules_by_suite( suite_to_modules: Dict[str, Set[str]], suite: str = None, ) -> Set[str]: """Return modules of the given suite name.""" if suite: return suite_to_modules.get(suite) return {mod for mod_set in suite_to_modules.values() for mod in mod_set} def _index_testable_modules(contents: Any, index_path: Path): """Dump testable modules. Args: content: An object that will be written to the index file. index_path: Path to the saved index file. """ logging.debug( r'Indexing testable modules... ' r'(This is required whenever module-info.json ' r'was rebuilt.)' ) index_path.parent.mkdir(parents=True, exist_ok=True) with tempfile.NamedTemporaryFile(delete=False) as cache: try: pickle.dump(contents, cache, protocol=2) shutil.move(cache.name, index_path) logging.debug('%s is created successfully.', index_path) except IOError: atest_utils.print_and_log_error('Failed in dumping %s', cache) os.remove(cache.name) class SqliteDict(collections.abc.Mapping): """A class that loads a Sqlite DB as a dictionary-like object. Args: conn: A connection to the Sqlite database. table_name: A string the table name. """ def __init__(self, conn: sqlite3.Connection, table_name: str): """Initialize the SqliteDict instance.""" self.conn = conn self.table = table_name def __iter__(self) -> str: """Iterate over the keys in the SqliteDict.""" for key in self._load_key_rows(): yield key[0] def _load_key_rows(self) -> Set[str]: """Load the key rows from the database table.""" results = self.conn.execute(f'SELECT key FROM {self.table}').fetchall() return set(results) def __len__(self) -> int: """Get the size of key-value pairs in the SqliteDict.""" return len(self._load_key_rows()) def __getitem__(self, key) -> Dict[str, Any]: """Get the value associated with the specified key.""" result = self.conn.execute( f'SELECT value FROM {self.table} WHERE key = ?', (key,) ).fetchone() if result: return json.loads(result[0]) raise KeyError(f'Bad key: {key}') def items(self) -> Tuple[str, Dict[str, Any]]: """Iterate over the key-value pairs in the SqliteDict.""" for key in self: value = self[key] yield key, value