1# Copyright 2018, The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Module Info class used to hold cached module-info.json.""" 16 17# pylint: disable=too-many-lines 18from __future__ import annotations 19 20import collections 21import json 22import logging 23import os 24from pathlib import Path 25import pickle 26import re 27import shutil 28import sqlite3 29import sys 30import tempfile 31import time 32from typing import Any, Callable, Dict, List, Set, Tuple 33 34from atest import atest_utils 35from atest import constants 36from atest.atest_enum import DetectType, ExitCode 37from atest.metrics import metrics 38 39 40# JSON file generated by build system that lists all buildable targets. 41_MODULE_INFO = 'module-info.json' 42# JSON file generated by build system that lists dependencies for java. 43_JAVA_DEP_INFO = 'module_bp_java_deps.json' 44# JSON file generated by build system that lists dependencies for cc. 45_CC_DEP_INFO = 'module_bp_cc_deps.json' 46# JSON file generated by atest merged the content from module-info, 47# module_bp_java_deps.json, and module_bp_cc_deps. 48_MERGED_INFO = 'atest_merged_dep.json' 49_DB_VERSION = 2 50_DB_NAME = f'module-info.{_DB_VERSION}.db' 51_NAME_MODULE_TABLE = 'modules' 52_PATH_MODULE_TABLE = 'path_modules' 53 54 55Module = Dict[str, Any] 56 57 58def load_from_file( 59 module_file: Path = None, 60 force_build: bool = False, 61) -> ModuleInfo: 62 """Factory method that initializes ModuleInfo from the build-generated 63 64 JSON file 65 """ 66 loader = Loader( 67 module_file=module_file, 68 force_build=force_build, 69 need_merge_fn=lambda: False, 70 ) 71 72 mi = loader.load() 73 74 return mi 75 76 77def load_from_dict(name_to_module_info: Dict[str, Any]) -> ModuleInfo: 78 """Factory method that initializes ModuleInfo from a dictionary.""" 79 path_to_module_info = get_path_to_module_info(name_to_module_info) 80 return ModuleInfo( 81 name_to_module_info=name_to_module_info, 82 path_to_module_info=path_to_module_info, 83 get_testable_modules=lambda s: _get_testable_modules( 84 name_to_module_info, path_to_module_info, s 85 ), 86 ) 87 88 89def create_empty() -> ModuleInfo: 90 """Factory method that initializes an empty ModuleInfo.""" 91 return ModuleInfo() 92 93 94def load( 95 force_build: bool = False, sqlite_module_cache: bool = False 96) -> ModuleInfo: 97 """Factory method that initializes ModuleInfo from the build-generated 98 99 JSON or Sqlite file. 100 """ 101 mod_start = time.time() 102 loader = Loader( 103 force_build=force_build, sqlite_module_cache=sqlite_module_cache 104 ) 105 mod_stop = time.time() - mod_start 106 metrics.LocalDetectEvent( 107 detect_type=DetectType.MODULE_INFO_INIT_MS, result=int(mod_stop * 1000) 108 ) 109 110 return loader.load(save_timestamps=True) 111 112 113def metrics_timer(func): 114 """Decorator method for sending data to metrics.""" 115 116 def wrapper(*args, **kwargs): 117 start = time.time() 118 result = func(*args, **kwargs) 119 elapsed_time = int(time.time() - start) 120 metrics.LocalDetectEvent( 121 detect_type=DetectType.TESTABLE_MODULES, result=elapsed_time 122 ) 123 return result 124 125 return wrapper 126 127 128class Loader: 129 """Class that handles load and merge processes.""" 130 131 def __init__( 132 self, 133 module_file: Path = None, 134 force_build: bool = False, 135 sqlite_module_cache: bool = False, 136 need_merge_fn: Callable = None, 137 ): 138 logging.debug( 139 'Creating module info loader object with module_file: %s, force_build:' 140 ' %s, sqlite_module_cache: %s, need_merge_fn: %s', 141 module_file, 142 force_build, 143 sqlite_module_cache, 144 need_merge_fn, 145 ) 146 self.java_dep_path = atest_utils.get_build_out_dir('soong', _JAVA_DEP_INFO) 147 self.cc_dep_path = atest_utils.get_build_out_dir('soong', _CC_DEP_INFO) 148 self.merged_dep_path = atest_utils.get_product_out(_MERGED_INFO) 149 logging.debug( 150 'java_dep_path: %s, cc_dep_path: %s, merged_dep_path: %s', 151 self.java_dep_path, 152 self.cc_dep_path, 153 self.merged_dep_path, 154 ) 155 156 self.sqlite_module_cache = sqlite_module_cache 157 logging.debug('sqlite_module_cache: %s', sqlite_module_cache) 158 if self.sqlite_module_cache: 159 self.cache_file = atest_utils.get_product_out(_DB_NAME) 160 self.save_cache_async = self._save_db_async 161 self.load_from_cache = self._load_from_db 162 else: 163 self.cache_file = self.merged_dep_path 164 self.save_cache_async = self._save_json_async 165 self.load_from_cache = self._load_from_json 166 167 if need_merge_fn: 168 self.save_cache_async = lambda _, __: None 169 170 self.update_merge_info = False 171 self.module_index = atest_utils.get_index_path( 172 f'suite-modules.{_DB_VERSION}.idx' 173 ) 174 self.module_index_proc = None 175 logging.debug('module_index: %s', self.module_index) 176 177 if module_file: 178 self.mod_info_file_path = Path(module_file) 179 self.load_module_info = self._load_module_info_from_file_wo_merging 180 else: 181 self.mod_info_file_path = atest_utils.get_product_out(_MODULE_INFO) 182 if force_build: 183 logging.debug('Triggering module info build by force build.') 184 build() 185 elif not self.mod_info_file_path.is_file(): 186 logging.debug( 187 'Triggering module info build due to module info file path %s not' 188 ' exist.', 189 self.mod_info_file_path, 190 ) 191 build() 192 193 self.update_merge_info = self.need_merge_module_info() 194 self.load_module_info = self._load_module_info_file 195 196 logging.debug( 197 'Executing load_module_info function %s', self.load_module_info 198 ) 199 self.name_to_module_info, self.path_to_module_info = self.load_module_info() 200 201 logging.debug('Completed creating module info loader object') 202 203 def load(self, save_timestamps: bool = False): 204 logging.debug('Loading ModuleInfo. save_timestamps: %s', save_timestamps) 205 if save_timestamps: 206 atest_utils.run_multi_proc(func=atest_utils.save_build_files_timestamp) 207 208 return ModuleInfo( 209 name_to_module_info=self.name_to_module_info, 210 path_to_module_info=self.path_to_module_info, 211 mod_info_file_path=self.mod_info_file_path, 212 get_testable_modules=self.get_testable_modules, 213 ) 214 215 def _load_module_info_file(self): 216 """Load module-info.json file as ModuleInfo and merge related JSON files 217 218 whenever required. 219 220 Returns: 221 Dict of module name to module info and dict of module path to module 222 info. 223 """ 224 # +--------------+ +----------------------------------+ 225 # | ModuleInfo() | | ModuleInfo(module_file=foo.json) | 226 # +-------+------+ +----------------+-----------------+ 227 # | module_info.build() | load 228 # v V 229 # +--------------------------+ +--------------------------+ 230 # | module-info.json | | foo.json | 231 # | module_bp_cc_deps.json | | module_bp_cc_deps.json | 232 # | module_bp_java_deps.json | | module_bp_java_deps.json | 233 # +--------------------------+ +--------------------------+ 234 # | | 235 # | _merge_soong_info() <--------------------+ 236 # v 237 # +============================+ 238 # | $ANDROID_PRODUCT_OUT | 239 # | /atest_merged_dep.json |--> load as module info. 240 # +============================+ 241 if not self.update_merge_info: 242 return self.load_from_cache() 243 244 name_modules, path_modules = self._load_from_json(merge=True) 245 self.save_cache_async(name_modules, path_modules) 246 self._save_testable_modules_async(name_modules, path_modules) 247 248 return name_modules, path_modules 249 250 def _load_module_info_from_file_wo_merging(self): 251 """Load module-info.json as ModuleInfo without merging.""" 252 name_modules = atest_utils.load_json_safely(self.mod_info_file_path) 253 _add_missing_variant_modules(name_modules) 254 255 return name_modules, get_path_to_module_info(name_modules) 256 257 def _save_db_async( 258 self, 259 name_to_module_info: Dict[str, Any], 260 path_to_module_info: Dict[str, Any], 261 ): 262 """Save data to a Sqlite database in parallel.""" 263 data_map = { 264 _NAME_MODULE_TABLE: name_to_module_info, 265 _PATH_MODULE_TABLE: path_to_module_info, 266 } 267 _save_data_async( 268 function=_create_db, 269 contents=data_map, 270 target_path=self.cache_file, 271 ) 272 273 def _load_from_db(self) -> Tuple[Dict[str, Any], Dict[str, Any]]: 274 """Return a tuple of dicts by from SqliteDict.""" 275 conn = sqlite3.connect(self.cache_file) 276 with conn: 277 name_to_module_info = SqliteDict(conn, _NAME_MODULE_TABLE) 278 path_to_module_info = SqliteDict(conn, _PATH_MODULE_TABLE) 279 280 return name_to_module_info, path_to_module_info 281 282 def _save_json_async(self, name_to_module_info: Dict[str, Any], _): 283 """Save data to a JSON format in parallel.""" 284 _save_data_async( 285 function=_create_json, 286 contents=name_to_module_info, 287 target_path=self.cache_file, 288 ) 289 290 def _load_from_json(self, merge: bool = False) -> Tuple[Dict, Dict]: 291 """Load or merge module info from json file. 292 293 Args: 294 merge: Boolean whether to merge build system infos. 295 296 Returns: 297 A tuple of (name_to_module_info, path_to_module_info). 298 """ 299 start = time.time() 300 if merge: 301 name_info = self._merge_build_system_infos( 302 atest_utils.load_json_safely(self.mod_info_file_path) 303 ) 304 duration = time.time() - start 305 logging.debug('Merging module info took %ss', duration) 306 metrics.LocalDetectEvent( 307 detect_type=DetectType.MODULE_MERGE_MS, result=int(duration * 1000) 308 ) 309 310 return name_info, get_path_to_module_info(name_info) 311 312 name_info = atest_utils.load_json_safely(self.merged_dep_path) 313 duration = time.time() - start 314 logging.debug('Loading module info took %ss', duration) 315 metrics.LocalDetectEvent( 316 detect_type=DetectType.MODULE_LOAD_MS, result=int(duration * 1000) 317 ) 318 logging.debug('Loading %s as module-info.', self.merged_dep_path) 319 320 return name_info, get_path_to_module_info(name_info) 321 322 def _save_testable_modules_async( 323 self, 324 name_to_module_info: Dict[str, Any], 325 path_to_module_info: Dict[str, Any], 326 ): 327 """Save testable modules in parallel.""" 328 return atest_utils.run_multi_proc( 329 func=_get_testable_modules, 330 kwargs={ 331 'name_to_module_info': name_to_module_info, 332 'path_to_module_info': path_to_module_info, 333 'index_path': self.module_index, 334 }, 335 ) 336 337 def need_merge_module_info(self): 338 """Check if needed to regenerate the cache file. 339 340 If the cache file is non-existent or testable module index is inexistent 341 or older than any of the JSON files used to generate it, the cache file 342 must re-generate. 343 344 Returns: 345 True when the cache file is older or non-existent, False otherwise. 346 """ 347 if not self.cache_file.is_file(): 348 return True 349 350 if not self.module_index.is_file(): 351 return True 352 353 # The dependency input files should be generated at this point. 354 return any( 355 self.cache_file.stat().st_mtime < f.stat().st_mtime 356 for f in (self.mod_info_file_path, self.java_dep_path, self.cc_dep_path) 357 ) 358 359 def _merge_build_system_infos( 360 self, name_to_module_info, java_bp_info_path=None, cc_bp_info_path=None 361 ): 362 """Merge the content of module-info.json and CC/Java dependency files 363 364 to name_to_module_info. 365 366 Args: 367 name_to_module_info: Dict of module name to module info dict. 368 java_bp_info_path: String of path to java dep file to load up. Used for 369 testing. 370 cc_bp_info_path: String of path to cc dep file to load up. Used for 371 testing. 372 373 Returns: 374 Dict of updated name_to_module_info. 375 """ 376 # Merge _JAVA_DEP_INFO 377 if not java_bp_info_path: 378 java_bp_info_path = self.java_dep_path 379 java_bp_infos = atest_utils.load_json_safely(java_bp_info_path) 380 if java_bp_infos: 381 logging.debug('Merging Java build info: %s', java_bp_info_path) 382 name_to_module_info = merge_soong_info(name_to_module_info, java_bp_infos) 383 # Merge _CC_DEP_INFO 384 if not cc_bp_info_path: 385 cc_bp_info_path = self.cc_dep_path 386 cc_bp_infos = atest_utils.load_json_safely(cc_bp_info_path) 387 if cc_bp_infos: 388 logging.debug('Merging CC build info: %s', cc_bp_info_path) 389 # CC's dep json format is different with java. 390 # Below is the example content: 391 # { 392 # "clang": "${ANDROID_ROOT}/bin/clang", 393 # "clang++": "${ANDROID_ROOT}/bin/clang++", 394 # "modules": { 395 # "ACameraNdkVendorTest": { 396 # "path": [ 397 # "frameworks/av/camera/ndk" 398 # ], 399 # "srcs": [ 400 # "frameworks/tests/AImageVendorTest.cpp", 401 # "frameworks/tests/ACameraManagerTest.cpp" 402 # ], 403 name_to_module_info = merge_soong_info( 404 name_to_module_info, cc_bp_infos.get('modules', {}) 405 ) 406 # If $ANDROID_PRODUCT_OUT was not created in pyfakefs, simply return it 407 # without dumping atest_merged_dep.json in real. 408 409 # Adds the key into module info as a unique ID. 410 for key, info in name_to_module_info.items(): 411 info[constants.MODULE_INFO_ID] = key 412 413 _add_missing_variant_modules(name_to_module_info) 414 415 return name_to_module_info 416 417 @metrics_timer 418 def get_testable_modules(self, suite=None): 419 """Return the testable modules of the given suite name. 420 421 Atest does not index testable modules against compatibility_suites. When 422 suite was given, or the index file was interrupted, always run 423 _get_testable_modules() and re-index. 424 425 Args: 426 suite: A string of suite name. 427 428 Returns: 429 If suite is not given, return all the testable modules in module 430 info, otherwise return only modules that belong to the suite. 431 """ 432 modules = set() 433 434 if self.module_index.is_file(): 435 modules = self.get_testable_modules_from_index(suite) 436 # If the modules.idx does not exist or invalid for any reason, generate 437 # a new one arbitrarily. 438 if not modules: 439 modules = self.get_testable_module_from_memory(suite) 440 441 return modules 442 443 def get_testable_modules_from_index(self, suite: str = None) -> Set[str]: 444 """Return the testable modules of the given suite name.""" 445 suite_to_modules = {} 446 with open(self.module_index, 'rb') as cache: 447 try: 448 suite_to_modules = pickle.load(cache, encoding='utf-8') 449 except UnicodeDecodeError: 450 suite_to_modules = pickle.load(cache) 451 # when module indexing was interrupted. 452 except EOFError: 453 pass 454 455 return _filter_modules_by_suite(suite_to_modules, suite) 456 457 def get_testable_module_from_memory(self, suite: str = None) -> Set[str]: 458 """Return the testable modules of the given suite name.""" 459 return _get_testable_modules( 460 name_to_module_info=self.name_to_module_info, 461 path_to_module_info=self.path_to_module_info, 462 index_path=self.module_index, 463 suite=suite, 464 ) 465 466 467class ModuleInfo: 468 """Class that offers fast/easy lookup for Module related details.""" 469 470 def __init__( 471 self, 472 name_to_module_info: Dict[str, Any] = None, 473 path_to_module_info: Dict[str, Any] = None, 474 mod_info_file_path: Path = None, 475 get_testable_modules: Callable = None, 476 ): 477 """Initialize the ModuleInfo object. 478 479 Load up the module-info.json file and initialize the helper vars. 480 Note that module-info.json does not contain all module dependencies, 481 therefore, Atest needs to accumulate dependencies defined in bp files. 482 483 Args: 484 name_to_module_info: Dict of name to module info. 485 path_to_module_info: Dict of path to module info. 486 mod_info_file_path: Path of module-info.json. 487 get_testable_modules: Function to get all testable modules. 488 """ 489 # +----------------------+ +----------------------------+ 490 # | $ANDROID_PRODUCT_OUT | |$ANDROID_BUILD_TOP/out/soong| 491 # | /module-info.json | | /module_bp_java_deps.json | 492 # +-----------+----------+ +-------------+--------------+ 493 # | _merge_soong_info() | 494 # +------------------------------+ 495 # | 496 # v 497 # +----------------------------+ +----------------------------+ 498 # |tempfile.NamedTemporaryFile | |$ANDROID_BUILD_TOP/out/soong| 499 # +-------------+--------------+ | /module_bp_cc_deps.json | 500 # | +-------------+--------------+ 501 # | _merge_soong_info() | 502 # +-------------------------------+ 503 # | 504 # +-------| 505 # v 506 # +============================+ 507 # | $ANDROID_PRODUCT_OUT | 508 # | /atest_merged_dep.json |--> load as module info. 509 # +============================+ 510 self.root_dir = os.environ.get(constants.ANDROID_BUILD_TOP) 511 512 self.name_to_module_info = name_to_module_info or {} 513 self.path_to_module_info = path_to_module_info or {} 514 self.mod_info_file_path = mod_info_file_path 515 self._get_testable_modules = get_testable_modules 516 517 def is_module(self, name): 518 """Return True if name is a module, False otherwise.""" 519 info = self.get_module_info(name) 520 # From aosp/2293302 it started merging all modules' dependency in bp 521 # even the module is not be exposed to make, and those modules could not 522 # be treated as a build target using m. Only treat input name as module 523 # if it also has the module_name attribute which means it could be a 524 # build target for m. 525 if info and info.get(constants.MODULE_NAME): 526 return True 527 return False 528 529 def get_paths(self, name) -> list[str]: 530 """Return paths of supplied module name, Empty list if non-existent.""" 531 info = self.get_module_info(name) 532 if info: 533 return info.get(constants.MODULE_PATH, []) 534 return [] 535 536 def get_module_names(self, rel_module_path): 537 """Get the modules that all have module_path. 538 539 Args: 540 rel_module_path: path of module in module-info.json 541 542 Returns: 543 List of module names. 544 """ 545 return _get_module_names(self.path_to_module_info, rel_module_path) 546 547 def get_module_info(self, mod_name): 548 """Return dict of info for given module name, None if non-existence.""" 549 return self.name_to_module_info.get(mod_name) 550 551 @staticmethod 552 def is_suite_in_compatibility_suites(suite, mod_info): 553 """Check if suite exists in the compatibility_suites of module-info. 554 555 Args: 556 suite: A string of suite name. 557 mod_info: Dict of module info to check. 558 559 Returns: 560 True if it exists in mod_info, False otherwise. 561 """ 562 if not isinstance(mod_info, dict): 563 return False 564 return suite in mod_info.get(constants.MODULE_COMPATIBILITY_SUITES, []) 565 566 def get_testable_modules(self, suite=None): 567 return self._get_testable_modules(suite) 568 569 @staticmethod 570 def is_tradefed_testable_module(info: Dict[str, Any]) -> bool: 571 """Check whether the module is a Tradefed executable test.""" 572 if not info: 573 return False 574 if not info.get(constants.MODULE_INSTALLED, []): 575 return False 576 return ModuleInfo.has_test_config(info) 577 578 @staticmethod 579 def is_mobly_module(info: Dict[str, Any]) -> bool: 580 """Check whether the module is a Mobly test. 581 582 Note: Only python_test_host modules marked with a test_options tag of 583 "mobly" is considered a Mobly module. 584 585 Args: 586 info: Dict of module info to check. 587 588 Returns: 589 True if this is a Mobly test module, False otherwise. 590 """ 591 return constants.MOBLY_TEST_OPTIONS_TAG in info.get( 592 constants.MODULE_TEST_OPTIONS_TAGS, [] 593 ) 594 595 def is_testable_module(self, info: Dict[str, Any]) -> bool: 596 """Check if module is something we can test. 597 598 A module is testable if: 599 - it's a tradefed testable module, or 600 - it's a Mobly module, or 601 - it's a robolectric module (or shares path with one). 602 603 Args: 604 info: Dict of module info to check. 605 606 Returns: 607 True if we can test this module, False otherwise. 608 """ 609 return _is_testable_module( 610 self.name_to_module_info, self.path_to_module_info, info 611 ) 612 613 @staticmethod 614 def has_test_config(info: Dict[str, Any]) -> bool: 615 """Validate if this module has a test config. 616 617 A module can have a test config in the following manner: 618 - test_config be set in module-info.json. 619 - Auto-generated config via the auto_test_config key 620 in module-info.json. 621 622 Args: 623 info: Dict of module info to check. 624 625 Returns: 626 True if this module has a test config, False otherwise. 627 """ 628 return bool( 629 info.get(constants.MODULE_TEST_CONFIG, []) 630 or info.get('auto_test_config', []) 631 ) 632 633 def is_legacy_robolectric_test(self, info: Dict[str, Any]) -> bool: 634 """Return whether the module_name is a legacy Robolectric test""" 635 return _is_legacy_robolectric_test( 636 self.name_to_module_info, self.path_to_module_info, info 637 ) 638 639 def get_robolectric_test_name(self, info: Dict[str, Any]) -> str: 640 """Returns runnable robolectric module name. 641 642 This method is for legacy robolectric tests and returns one of associated 643 modules. The pattern is determined by the amount of shards: 644 645 10 shards: 646 FooTests -> RunFooTests0, RunFooTests1 ... RunFooTests9 647 No shard: 648 FooTests -> RunFooTests 649 650 Arg: 651 info: Dict of module info to check. 652 653 Returns: 654 String of the first-matched associated module that belongs to the 655 actual robolectric module, None if nothing has been found. 656 """ 657 return _get_robolectric_test_name( 658 self.name_to_module_info, self.path_to_module_info, info 659 ) 660 661 def is_robolectric_test(self, module_name): 662 """Check if the given module is a robolectric test. 663 664 Args: 665 module_name: String of module to check. 666 667 Returns: 668 Boolean whether it's a robotest or not. 669 """ 670 if self.get_robolectric_type(module_name): 671 return True 672 return False 673 674 def get_robolectric_type(self, module_name: str) -> int: 675 """Check if the given module is a robolectric test and return type of it. 676 677 Robolectric declaration is converting from Android.mk to Android.bp, and 678 in the interim Atest needs to support testing both types of tests. 679 680 The modern robolectric tests defined by 'android_robolectric_test' in an 681 Android.bp file can can be run in Tradefed Test Runner: 682 683 SettingsRoboTests -> Tradefed Test Runner 684 685 Legacy tests defined in an Android.mk can only run with the 'make' way. 686 687 SettingsRoboTests -> make RunSettingsRoboTests0 688 689 To determine whether the test is a modern/legacy robolectric test: 690 1. If the 'robolectric-test` in the compatibility_suites, it's a 691 modern one, otherwise it's a legacy test. This is accurate since 692 aosp/2308586 already set the test suite of `robolectric-test` 693 for all `modern` Robolectric tests in Soong. 694 2. Traverse all modules share the module path. If one of the 695 modules has a ROBOLECTRIC class, it's a legacy robolectric test. 696 697 Args: 698 module_name: String of module to check. 699 700 Returns: 701 0: not a robolectric test. 702 1: a modern robolectric test(defined in Android.bp) 703 2: a legacy robolectric test(defined in Android.mk) 704 """ 705 info = self.get_module_info(module_name) 706 if not info: 707 return 0 708 # Some Modern mode Robolectric test has related module which compliant 709 # with the Legacy Robolectric test. In this case, the Modern mode 710 # Robolectric tests should be prior to the Legacy mode. 711 if self.is_modern_robolectric_test(info): 712 return constants.ROBOTYPE_MODERN 713 if self.is_legacy_robolectric_test(info): 714 return constants.ROBOTYPE_LEGACY 715 return 0 716 717 def get_instrumentation_target_apps(self, module_name: str) -> Dict: 718 """Return target APKs of an instrumentation test. 719 720 Returns: 721 A dict of target module and target APK(s). e.g. 722 {"FooService": {"/path/to/the/FooService.apk"}} 723 """ 724 # 1. Determine the actual manifest filename from an Android.bp(if any) 725 manifest = self.get_filepath_from_module(module_name, 'AndroidManifest.xml') 726 bpfile = self.get_filepath_from_module(module_name, 'Android.bp') 727 if bpfile.is_file(): 728 bp_info = atest_utils.get_bp_content(bpfile, 'android_test') 729 if not bp_info or not bp_info.get(module_name): 730 return {} 731 manifest = self.get_filepath_from_module( 732 module_name, bp_info.get(module_name).get('manifest') 733 ) 734 xml_info = atest_utils.get_manifest_info(manifest) 735 # 2. Translate package name to a module name. 736 package = xml_info.get('package') 737 target_package = xml_info.get('target_package') 738 # Ensure it's an instrumentation test(excluding self-instrmented) 739 if target_package and package != target_package: 740 logging.debug('Found %s an instrumentation test.', module_name) 741 metrics.LocalDetectEvent( 742 detect_type=DetectType.FOUND_INSTRUMENTATION_TEST, result=1 743 ) 744 target_module = self.get_target_module_by_pkg( 745 package=target_package, search_from=manifest.parent 746 ) 747 if target_module: 748 return self.get_artifact_map(target_module) 749 return {} 750 751 # pylint: disable=anomalous-backslash-in-string 752 def get_target_module_by_pkg(self, package: str, search_from: Path) -> str: 753 """Translate package name to the target module name. 754 755 This method is dedicated to determine the target module by translating 756 a package name. 757 758 Phase 1: Find out possible manifest files among parent directories. 759 Phase 2. Look for the defined package fits the given name, and ensure 760 it is not a persistent app. 761 Phase 3: Translate the manifest path to possible modules. A valid module 762 must fulfill: 763 1. The 'class' type must be ['APPS']. 764 2. It is not a Robolectric test. 765 766 Returns: 767 A string of module name. 768 """ 769 xmls = [] 770 for pth in search_from.parents: 771 if pth == Path(self.root_dir): 772 break 773 for name in os.listdir(pth): 774 if pth.joinpath(name).is_file(): 775 match = re.match('.*AndroidManifest.*\.xml$', name) 776 if match: 777 xmls.append(os.path.join(pth, name)) 778 possible_modules = [] 779 for xml in xmls: 780 rel_dir = str(Path(xml).relative_to(self.root_dir).parent) 781 logging.debug('Looking for package "%s" in %s...', package, xml) 782 xml_info = atest_utils.get_manifest_info(xml) 783 if xml_info.get('package') == package: 784 if xml_info.get('persistent'): 785 logging.debug('%s is a persistent app.', package) 786 continue 787 for _m in self.path_to_module_info.get(rel_dir): 788 possible_modules.append(_m) 789 if possible_modules: 790 for mod in possible_modules: 791 name = mod.get('module_name') 792 if mod.get('class') == ['APPS'] and not self.is_robolectric_test(name): 793 return name 794 return '' 795 796 def get_artifact_map(self, module_name: str) -> Dict: 797 """Get the installed APK path of the given module.""" 798 target_mod_info = self.get_module_info(module_name) 799 artifact_map = {} 800 if target_mod_info: 801 apks = set() 802 artifacts = target_mod_info.get('installed') 803 for artifact in artifacts: 804 if Path(artifact).suffix == '.apk': 805 apks.add(os.path.join(self.root_dir, artifact)) 806 artifact_map.update({module_name: apks}) 807 return artifact_map 808 809 def is_auto_gen_test_config(self, module_name): 810 """Check if the test config file will be generated automatically. 811 812 Args: 813 module_name: A string of the module name. 814 815 Returns: 816 True if the test config file will be generated automatically. 817 """ 818 if self.is_module(module_name): 819 mod_info = self.get_module_info(module_name) 820 auto_test_config = mod_info.get('auto_test_config', []) 821 return auto_test_config and auto_test_config[0] 822 return False 823 824 @staticmethod 825 def is_legacy_robolectric_class(info: Dict[str, Any]) -> bool: 826 """Check if the class is `ROBOLECTRIC` 827 828 This method is for legacy robolectric tests that the associated modules 829 contain: 830 'class': ['ROBOLECTRIC'] 831 832 Args: 833 info: ModuleInfo to check. 834 835 Returns: 836 True if the attribute class in mod_info is ROBOLECTRIC, False 837 otherwise. 838 """ 839 if info: 840 module_classes = info.get(constants.MODULE_CLASS, []) 841 return ( 842 module_classes 843 and module_classes[0] == constants.MODULE_CLASS_ROBOLECTRIC 844 ) 845 return False 846 847 def is_native_test(self, module_name): 848 """Check if the input module is a native test. 849 850 Args: 851 module_name: A string of the module name. 852 853 Returns: 854 True if the test is a native test, False otherwise. 855 """ 856 mod_info = self.get_module_info(module_name) 857 return constants.MODULE_CLASS_NATIVE_TESTS in mod_info.get( 858 constants.MODULE_CLASS, [] 859 ) 860 861 def has_mainline_modules( 862 self, module_name: str, mainline_binaries: List[str] 863 ) -> bool: 864 """Check if the mainline modules are in module-info. 865 866 Args: 867 module_name: A string of the module name. 868 mainline_binaries: A list of mainline module binaries. 869 870 Returns: 871 True if mainline_binaries is in module-info, False otherwise. 872 """ 873 mod_info = self.get_module_info(module_name) 874 # Check 'test_mainline_modules' attribute of the module-info.json. 875 mm_in_mf = mod_info.get(constants.MODULE_MAINLINE_MODULES, []) 876 ml_modules_set = set(mainline_binaries) 877 if mm_in_mf: 878 return contains_same_mainline_modules(ml_modules_set, set(mm_in_mf)) 879 for test_config in mod_info.get(constants.MODULE_TEST_CONFIG, []): 880 # Check the value of 'mainline-param' in the test config. 881 if not self.is_auto_gen_test_config(module_name): 882 return contains_same_mainline_modules( 883 ml_modules_set, 884 atest_utils.get_mainline_param( 885 os.path.join(self.root_dir, test_config) 886 ), 887 ) 888 # Unable to verify mainline modules in an auto-gen test config. 889 logging.debug( 890 '%s is associated with an auto-generated test config.', module_name 891 ) 892 return True 893 return False 894 895 def get_filepath_from_module(self, module_name: str, filename: str) -> Path: 896 """Return absolute path of the given module and filename.""" 897 mod_path = self.get_paths(module_name) 898 if mod_path: 899 return Path(self.root_dir).joinpath(mod_path[0], filename) 900 return Path() 901 902 def get_module_dependency(self, module_name, depend_on=None): 903 """Get the dependency sets for input module. 904 905 Recursively find all the dependencies of the input module. 906 907 Args: 908 module_name: String of module to check. 909 depend_on: The list of parent dependencies. 910 911 Returns: 912 Set of dependency modules. 913 """ 914 if not depend_on: 915 depend_on = set() 916 deps = set() 917 mod_info = self.get_module_info(module_name) 918 if not mod_info: 919 return deps 920 mod_deps = set(mod_info.get(constants.MODULE_DEPENDENCIES, [])) 921 # Remove item in deps if it already in depend_on: 922 mod_deps = mod_deps - depend_on 923 deps = deps.union(mod_deps) 924 for mod_dep in mod_deps: 925 deps = deps.union( 926 set( 927 self.get_module_dependency( 928 mod_dep, depend_on=depend_on.union(deps) 929 ) 930 ) 931 ) 932 return deps 933 934 def get_install_module_dependency(self, module_name, depend_on=None): 935 """Get the dependency set for the given modules with installed path. 936 937 Args: 938 module_name: String of module to check. 939 depend_on: The list of parent dependencies. 940 941 Returns: 942 Set of dependency modules which has installed path. 943 """ 944 install_deps = set() 945 deps = self.get_module_dependency(module_name, depend_on) 946 logging.debug('%s depends on: %s', module_name, deps) 947 for module in deps: 948 mod_info = self.get_module_info(module) 949 if mod_info and mod_info.get(constants.MODULE_INSTALLED, []): 950 install_deps.add(module) 951 logging.debug( 952 'modules %s required by %s were not installed', 953 install_deps, 954 module_name, 955 ) 956 return install_deps 957 958 @staticmethod 959 def is_unit_test(mod_info): 960 """Return True if input module is unit test, False otherwise. 961 962 Args: 963 mod_info: ModuleInfo to check. 964 965 Returns: 966 True if input module is unit test, False otherwise. 967 """ 968 return mod_info.get(constants.MODULE_IS_UNIT_TEST, '') == 'true' 969 970 def is_host_unit_test(self, info: Dict[str, Any]) -> bool: 971 """Return True if input module is host unit test, False otherwise. 972 973 Args: 974 info: ModuleInfo to check. 975 976 Returns: 977 True if input module is host unit test, False otherwise. 978 """ 979 return self.is_tradefed_testable_module( 980 info 981 ) and self.is_suite_in_compatibility_suites('host-unit-tests', info) 982 983 def is_modern_robolectric_test(self, info: Dict[str, Any]) -> bool: 984 """Return whether 'robolectric-tests' is in 'compatibility_suites'.""" 985 return self.is_tradefed_testable_module( 986 info 987 ) and self.is_robolectric_test_suite(info) 988 989 def is_robolectric_test_suite(self, mod_info) -> bool: 990 """Return True if 'robolectric-tests' in the compatibility_suites. 991 992 Args: 993 mod_info: ModuleInfo to check. 994 995 Returns: 996 True if the 'robolectric-tests' is in the compatibility_suites, 997 False otherwise. 998 """ 999 return self.is_suite_in_compatibility_suites('robolectric-tests', mod_info) 1000 1001 def is_ravenwood_test(self, info: Dict[str, Any]) -> bool: 1002 """Return whether 'ravenwood-tests' is in 'compatibility_suites'.""" 1003 return self.is_tradefed_testable_module( 1004 info 1005 ) and self.is_ravenwood_test_suite(info) 1006 1007 def is_ravenwood_test_suite(self, mod_info) -> bool: 1008 """Return True if 'ravenwood-tests' in the compatibility_suites. 1009 1010 Args: 1011 mod_info: ModuleInfo to check. 1012 1013 Returns: 1014 True if the 'ravenwood-tests' is in the compatibility_suites, 1015 False otherwise. 1016 """ 1017 return self.is_suite_in_compatibility_suites('ravenwood-tests', mod_info) 1018 1019 def is_device_driven_test(self, mod_info): 1020 """Return True if input module is device driven test, False otherwise. 1021 1022 Args: 1023 mod_info: ModuleInfo to check. 1024 1025 Returns: 1026 True if input module is device driven test, False otherwise. 1027 """ 1028 if self.is_robolectric_test_suite(mod_info): 1029 return False 1030 if self.is_ravenwood_test_suite(mod_info): 1031 return False 1032 1033 return self.is_tradefed_testable_module( 1034 mod_info 1035 ) and 'DEVICE' in mod_info.get(constants.MODULE_SUPPORTED_VARIANTS, []) 1036 1037 def is_host_driven_test(self, mod_info): 1038 """Return True if input module is host driven test, False otherwise. 1039 1040 Args: 1041 mod_info: ModuleInfo to check. 1042 1043 Returns: 1044 True if input module is host driven test, False otherwise. 1045 """ 1046 return self.is_tradefed_testable_module( 1047 mod_info 1048 ) and 'HOST' in mod_info.get(constants.MODULE_SUPPORTED_VARIANTS, []) 1049 1050 def _any_module(self, _: Module) -> bool: 1051 return True 1052 1053 def get_all_tests(self): 1054 """Get a list of all the module names which are tests.""" 1055 return self._get_all_modules(type_predicate=self.is_testable_module) 1056 1057 def get_all_unit_tests(self): 1058 """Get a list of all the module names which are unit tests.""" 1059 return self._get_all_modules(type_predicate=ModuleInfo.is_unit_test) 1060 1061 def get_all_host_unit_tests(self): 1062 """Get a list of all the module names which are host unit tests.""" 1063 return self._get_all_modules(type_predicate=self.is_host_unit_test) 1064 1065 def get_all_device_driven_tests(self): 1066 """Get a list of all the module names which are device driven tests.""" 1067 return self._get_all_modules(type_predicate=self.is_device_driven_test) 1068 1069 def _get_all_modules(self, type_predicate=None): 1070 """Get a list of all the module names that passed the predicate.""" 1071 modules = [] 1072 type_predicate = type_predicate or self._any_module 1073 for mod_name, mod_info in self.name_to_module_info.items(): 1074 if mod_info.get(constants.MODULE_NAME, '') == mod_name: 1075 if type_predicate(mod_info): 1076 modules.append(mod_name) 1077 return modules 1078 1079 def get_modules_by_path_in_srcs( 1080 self, path: str, testable_modules_only: bool = False 1081 ) -> Set[str]: 1082 """Get the module name that the given path belongs to.(in 'srcs') 1083 1084 Args: 1085 path: file path which is relative to ANDROID_BUILD_TOP. 1086 testable_modules_only: boolean flag which determines whether search 1087 testable modules only or not. 1088 1089 Returns: 1090 A set of string for matched module names, empty set if nothing find. 1091 """ 1092 modules = set() 1093 1094 for mod_name in ( 1095 self.get_testable_modules() 1096 if testable_modules_only 1097 else self.name_to_module_info.keys() 1098 ): 1099 m_info = self.get_module_info(mod_name) 1100 if m_info: 1101 for src in m_info.get(constants.MODULE_SRCS, []): 1102 if src in path: 1103 modules.add(mod_name) 1104 1105 return modules 1106 1107 def get_modules_by_path( 1108 self, path: str, testable_modules_only: bool = False 1109 ) -> set[str]: 1110 """Get the module names that the give path belongs to. 1111 1112 Args: 1113 path: dir path for searching among `path` in module information. 1114 testable_modules_only: boolean flag which determines whether search 1115 testable modules only or not. 1116 1117 Returns: 1118 A set of module names. 1119 """ 1120 modules = set() 1121 is_testable_module_fn = ( 1122 self.is_testable_module if testable_modules_only else lambda _: True 1123 ) 1124 1125 m_infos = self.path_to_module_info.get(path) 1126 if m_infos: 1127 modules = { 1128 info.get(constants.MODULE_NAME) 1129 for info in m_infos 1130 if is_testable_module_fn(info) 1131 } 1132 1133 return modules 1134 1135 def get_modules_by_include_deps( 1136 self, deps: Set[str], testable_module_only: bool = False 1137 ) -> Set[str]: 1138 """Get the matched module names for the input dependencies. 1139 1140 Args: 1141 deps: A set of string for dependencies. 1142 testable_module_only: Option if only want to get testable module. 1143 1144 Returns: 1145 A set of matched module names for the input dependencies. 1146 """ 1147 modules = set() 1148 1149 for mod_name in ( 1150 self.get_testable_modules() 1151 if testable_module_only 1152 else self.name_to_module_info.keys() 1153 ): 1154 mod_info = self.get_module_info(mod_name) 1155 if mod_info and deps.intersection( 1156 set(mod_info.get(constants.MODULE_DEPENDENCIES, [])) 1157 ): 1158 modules.add(mod_info.get(constants.MODULE_NAME)) 1159 return modules 1160 1161 def get_installed_paths(self, module_name: str) -> List[Path]: 1162 """Return installed path from module info.""" 1163 mod_info = self.get_module_info(module_name) 1164 if not mod_info: 1165 return [] 1166 1167 def _to_abs_path(p): 1168 if os.path.isabs(p): 1169 return Path(p) 1170 return Path(os.getenv(constants.ANDROID_BUILD_TOP), p) 1171 1172 return [_to_abs_path(p) for p in mod_info.get('installed', [])] 1173 1174 def get_code_under_test(self, module_name: str) -> List[str]: 1175 """Return code under test from module info.""" 1176 mod_info = self.get_module_info(module_name) 1177 if not mod_info: 1178 atest_utils.colorful_print( 1179 '\nmodule %s cannot be found in module info, skip generating' 1180 ' coverage for it.' % module_name, 1181 constants.YELLOW, 1182 ) 1183 return [] 1184 1185 return mod_info.get('code_under_test', []) 1186 1187 def build_variants(self, info: Dict[str, Any]) -> List[str]: 1188 return info.get(constants.MODULE_SUPPORTED_VARIANTS, []) 1189 1190 def requires_device(self, info: Dict[str, Any]) -> bool: 1191 1192 if self.is_modern_robolectric_test(info): 1193 return False 1194 if self.is_ravenwood_test(info): 1195 return False 1196 if self.is_host_unit_test(info) and 'DEVICE' not in self.build_variants( 1197 info 1198 ): 1199 return False 1200 1201 return True 1202 1203 1204def _create_db(data_map: Dict[str, Dict[str, Any]], db_path: Path): 1205 """Create a Sqlite DB by writing to tempfile and move it to the right place. 1206 1207 Args: 1208 data_map: A dict where the key is table name and value is data itself. 1209 db_path: A Path pointing to the DB file. 1210 """ 1211 if db_path.is_file(): 1212 db_path.unlink() 1213 1214 with tempfile.NamedTemporaryFile(delete=False) as tmp_db: 1215 _create_db_in_path(data_map, tmp_db.name) 1216 shutil.move(tmp_db.name, db_path) 1217 1218 logging.debug('%s is created successfully.', db_path) 1219 1220 1221def _create_db_in_path(data_map: Dict[str, Dict[str, Any]], db_path: Path): 1222 """Create a Sqlite DB with multiple tables. 1223 1224 Args: 1225 data_map: A dict where the key is table name and value is data itself. 1226 db_path: A Path pointing to the DB file. 1227 """ 1228 con = sqlite3.connect(db_path) 1229 with con: 1230 cur = con.cursor() 1231 for table, contents in data_map.items(): 1232 cur.execute(f'CREATE TABLE {table}(key TEXT PRIMARY KEY, value TEXT)') 1233 1234 data = [] 1235 for k, v in contents.items(): 1236 data.append({'key': k, 'value': json.dumps(v)}) 1237 cur.executemany(f'INSERT INTO {table} VALUES(:key, :value)', data) 1238 1239 1240def _create_json(data_map: Dict[str, Any], json_path: Path): 1241 """Write content onto a JSON file. 1242 1243 Args: 1244 data_map: A dict where the key is table name and value is data itself. 1245 json_path: A Path pointing to the JSON file. 1246 """ 1247 if json_path.is_file(): 1248 json_path.unlink() 1249 1250 with tempfile.NamedTemporaryFile(delete=False) as temp_json: 1251 with open(temp_json.name, 'w', encoding='utf-8') as _temp: 1252 json.dump(data_map, _temp, indent=0) 1253 shutil.move(temp_json.name, json_path) 1254 1255 logging.debug('%s is created successfully.', json_path) 1256 1257 1258def _save_data_async(function: Callable, contents: Any, target_path: Path): 1259 """Save contents to a static file in asynchronized manner.""" 1260 atest_utils.run_multi_proc( 1261 func=function, 1262 args=[contents, target_path], 1263 # We set `daemon` to `False` to make sure that Atest doesn't exit before 1264 # writing the cache file. 1265 daemon=False, 1266 ) 1267 1268 1269def merge_soong_info(name_to_module_info, mod_bp_infos): 1270 """Merge the dependency and srcs in mod_bp_infos to name_to_module_info. 1271 1272 Args: 1273 name_to_module_info: Dict of module name to module info dict. 1274 mod_bp_infos: Dict of module name to bp's module info dict. 1275 1276 Returns: 1277 Dict of updated name_to_module_info. 1278 """ 1279 merge_items = [ 1280 constants.MODULE_DEPENDENCIES, 1281 constants.MODULE_SRCS, 1282 constants.MODULE_LIBS, 1283 constants.MODULE_STATIC_LIBS, 1284 constants.MODULE_STATIC_DEPS, 1285 constants.MODULE_PATH, 1286 ] 1287 for module_name, dep_info in mod_bp_infos.items(): 1288 mod_info = name_to_module_info.setdefault(module_name, {}) 1289 for merge_item in merge_items: 1290 dep_info_values = dep_info.get(merge_item, []) 1291 mod_info_values = mod_info.get(merge_item, []) 1292 mod_info_values.extend(dep_info_values) 1293 mod_info_values.sort() 1294 # deduplicate values just in case. 1295 mod_info_values = list(dict.fromkeys(mod_info_values)) 1296 name_to_module_info[module_name][merge_item] = mod_info_values 1297 return name_to_module_info 1298 1299 1300def _add_missing_variant_modules(name_to_module_info: Dict[str, Module]): 1301 missing_modules = {} 1302 1303 # Android's build system automatically adds a suffix for some build module 1304 # variants. For example, a module-info entry for a module originally named 1305 # 'HelloWorldTest' might appear as 'HelloWorldTest_32' and which Atest would 1306 # not be able to find. We add such entries if not already present so they 1307 # can be looked up using their declared module name. 1308 for mod_name, mod_info in name_to_module_info.items(): 1309 declared_module_name = mod_info.get(constants.MODULE_NAME, mod_name) 1310 if declared_module_name in name_to_module_info: 1311 continue 1312 missing_modules.setdefault(declared_module_name, mod_info) 1313 1314 name_to_module_info.update(missing_modules) 1315 1316 1317def contains_same_mainline_modules( 1318 mainline_modules: Set[str], module_lists: Set[str] 1319): 1320 """Check if mainline modules listed on command line is 1321 1322 the same set as config. 1323 1324 Args: 1325 mainline_modules: A list of mainline modules from triggered test. 1326 module_lists: A list of concatenate mainline module string from test 1327 configs. 1328 1329 Returns 1330 True if the set mainline modules from triggered test is in the test 1331 configs. 1332 """ 1333 for module_string in module_lists: 1334 if mainline_modules == set(module_string.split('+')): 1335 return True 1336 return False 1337 1338 1339def get_path_to_module_info(name_to_module_info): 1340 """Return the path_to_module_info dict. 1341 1342 Args: 1343 name_to_module_info: Dict of module name to module info dict. 1344 1345 Returns: 1346 Dict of module path to module info dict. 1347 """ 1348 path_to_module_info = {} 1349 for mod_name, mod_info in name_to_module_info.items(): 1350 # Cross-compiled and multi-arch modules actually all belong to 1351 # a single target so filter out these extra modules. 1352 if mod_name != mod_info.get(constants.MODULE_NAME, ''): 1353 continue 1354 for path in mod_info.get(constants.MODULE_PATH, []): 1355 mod_info[constants.MODULE_NAME] = mod_name 1356 # There could be multiple modules in a path. 1357 if path in path_to_module_info: 1358 path_to_module_info[path].append(mod_info) 1359 else: 1360 path_to_module_info[path] = [mod_info] 1361 return path_to_module_info 1362 1363 1364def _get_module_names(path_to_module_info, rel_module_path): 1365 """Get the modules that all have module_path. 1366 1367 Args: 1368 path_to_module_info: Dict of path to module info. 1369 rel_module_path: path of module in module-info.json. 1370 1371 Returns: 1372 List of module names. 1373 """ 1374 return [ 1375 m.get(constants.MODULE_NAME) 1376 for m in path_to_module_info.get(rel_module_path, []) 1377 ] 1378 1379 1380def _get_robolectric_test_name( 1381 name_to_module_info: Dict[str, Dict], 1382 path_to_module_info: Dict[str, Dict], 1383 info: Dict[str, Any], 1384) -> str: 1385 """Returns runnable robolectric module name. 1386 1387 This method is for legacy robolectric tests and returns one of associated 1388 modules. The pattern is determined by the amount of shards: 1389 1390 10 shards: 1391 FooTests -> RunFooTests0, RunFooTests1 ... RunFooTests9 1392 No shard: 1393 FooTests -> RunFooTests 1394 1395 Arg: 1396 name_to_module_info: Dict of name to module info. 1397 path_to_module_info: Dict of path to module info. 1398 info: Dict of module info to check. 1399 1400 Returns: 1401 String of the first-matched associated module that belongs to the 1402 actual robolectric module, None if nothing has been found. 1403 """ 1404 if not info: 1405 return '' 1406 module_paths = info.get(constants.MODULE_PATH, []) 1407 if not module_paths: 1408 return '' 1409 filtered_module_names = [ 1410 name 1411 for name in _get_module_names(path_to_module_info, module_paths[0]) 1412 if name.startswith('Run') 1413 ] 1414 return next( 1415 ( 1416 name 1417 for name in filtered_module_names 1418 if ModuleInfo.is_legacy_robolectric_class( 1419 name_to_module_info.get(name) 1420 ) 1421 ), 1422 '', 1423 ) 1424 1425 1426def _is_legacy_robolectric_test( 1427 name_to_module_info: Dict[str, Dict], 1428 path_to_module_info: Dict[str, Dict], 1429 info: Dict[str, Any], 1430) -> bool: 1431 """Return whether the module_name is a legacy Robolectric test""" 1432 if ModuleInfo.is_tradefed_testable_module(info): 1433 return False 1434 return bool( 1435 _get_robolectric_test_name(name_to_module_info, path_to_module_info, info) 1436 ) 1437 1438 1439def get_module_info_target() -> str: 1440 """Get module info target name for soong_ui.bash""" 1441 build_top = atest_utils.get_build_top() 1442 module_info_path = atest_utils.get_product_out(_MODULE_INFO) 1443 if module_info_path.is_relative_to(build_top): 1444 return str(module_info_path.relative_to(build_top)) 1445 1446 logging.debug('Found customized OUT_DIR!') 1447 return str(module_info_path) 1448 1449 1450def build(): 1451 """Build module-info.json""" 1452 logging.debug( 1453 'Generating %s - this is required for initial runs or forced rebuilds.', 1454 _MODULE_INFO, 1455 ) 1456 build_start = time.time() 1457 if not atest_utils.build([get_module_info_target()]): 1458 sys.exit(ExitCode.BUILD_FAILURE) 1459 1460 metrics.LocalDetectEvent( 1461 detect_type=DetectType.ONLY_BUILD_MODULE_INFO, 1462 result=int(time.time() - build_start), 1463 ) 1464 1465 1466def _is_testable_module( 1467 name_to_module_info: Dict[str, Dict], 1468 path_to_module_info: Dict[str, Dict], 1469 info: Dict[str, Any], 1470) -> bool: 1471 """Check if module is something we can test. 1472 1473 A module is testable if: 1474 - it's a tradefed testable module, or 1475 - it's a Mobly module, or 1476 - it's a robolectric module (or shares path with one). 1477 1478 Args: 1479 name_to_module_info: Dict of name to module info. 1480 path_to_module_info: Dict of path to module info. 1481 info: Dict of module info to check. 1482 1483 Returns: 1484 True if we can test this module, False otherwise. 1485 """ 1486 if not info or not info.get(constants.MODULE_NAME): 1487 return False 1488 if ModuleInfo.is_tradefed_testable_module(info): 1489 return True 1490 if ModuleInfo.is_mobly_module(info): 1491 return True 1492 if _is_legacy_robolectric_test( 1493 name_to_module_info, path_to_module_info, info 1494 ): 1495 return True 1496 return False 1497 1498 1499def _get_testable_modules( 1500 name_to_module_info: Dict[str, Dict], 1501 path_to_module_info: Dict[str, Dict], 1502 suite: str = None, 1503 index_path: Path = None, 1504): 1505 """Return testable modules of the given suite name.""" 1506 suite_to_modules = _get_suite_to_modules( 1507 name_to_module_info, path_to_module_info, index_path 1508 ) 1509 1510 return _filter_modules_by_suite(suite_to_modules, suite) 1511 1512 1513def _get_suite_to_modules( 1514 name_to_module_info: Dict[str, Dict], 1515 path_to_module_info: Dict[str, Dict], 1516 index_path: Path = None, 1517) -> Dict[str, Set[str]]: 1518 """Map suite and its modules. 1519 1520 Args: 1521 name_to_module_info: Dict of name to module info. 1522 path_to_module_info: Dict of path to module info. 1523 index_path: Path of the stored content. 1524 1525 Returns: 1526 Dict of suite and testable modules mapping. 1527 """ 1528 suite_to_modules = {} 1529 1530 for _, info in name_to_module_info.items(): 1531 if _is_testable_module(name_to_module_info, path_to_module_info, info): 1532 testable_module = info.get(constants.MODULE_NAME) 1533 suites = ( 1534 info.get('compatibility_suites') 1535 if info.get('compatibility_suites') 1536 else ['null-suite'] 1537 ) 1538 1539 for suite in suites: 1540 suite_to_modules.setdefault(suite, set()).add(testable_module) 1541 1542 if index_path: 1543 _index_testable_modules(suite_to_modules, index_path) 1544 1545 return suite_to_modules 1546 1547 1548def _filter_modules_by_suite( 1549 suite_to_modules: Dict[str, Set[str]], 1550 suite: str = None, 1551) -> Set[str]: 1552 """Return modules of the given suite name.""" 1553 if suite: 1554 return suite_to_modules.get(suite) 1555 1556 return {mod for mod_set in suite_to_modules.values() for mod in mod_set} 1557 1558 1559def _index_testable_modules(contents: Any, index_path: Path): 1560 """Dump testable modules. 1561 1562 Args: 1563 content: An object that will be written to the index file. 1564 index_path: Path to the saved index file. 1565 """ 1566 logging.debug( 1567 r'Indexing testable modules... ' 1568 r'(This is required whenever module-info.json ' 1569 r'was rebuilt.)' 1570 ) 1571 index_path.parent.mkdir(parents=True, exist_ok=True) 1572 with tempfile.NamedTemporaryFile(delete=False) as cache: 1573 try: 1574 pickle.dump(contents, cache, protocol=2) 1575 shutil.move(cache.name, index_path) 1576 logging.debug('%s is created successfully.', index_path) 1577 except IOError: 1578 atest_utils.print_and_log_error('Failed in dumping %s', cache) 1579 os.remove(cache.name) 1580 1581 1582class SqliteDict(collections.abc.Mapping): 1583 """A class that loads a Sqlite DB as a dictionary-like object. 1584 1585 Args: 1586 conn: A connection to the Sqlite database. 1587 table_name: A string the table name. 1588 """ 1589 1590 def __init__(self, conn: sqlite3.Connection, table_name: str): 1591 """Initialize the SqliteDict instance.""" 1592 self.conn = conn 1593 self.table = table_name 1594 1595 def __iter__(self) -> str: 1596 """Iterate over the keys in the SqliteDict.""" 1597 for key in self._load_key_rows(): 1598 yield key[0] 1599 1600 def _load_key_rows(self) -> Set[str]: 1601 """Load the key rows from the database table.""" 1602 results = self.conn.execute(f'SELECT key FROM {self.table}').fetchall() 1603 return set(results) 1604 1605 def __len__(self) -> int: 1606 """Get the size of key-value pairs in the SqliteDict.""" 1607 return len(self._load_key_rows()) 1608 1609 def __getitem__(self, key) -> Dict[str, Any]: 1610 """Get the value associated with the specified key.""" 1611 result = self.conn.execute( 1612 f'SELECT value FROM {self.table} WHERE key = ?', (key,) 1613 ).fetchone() 1614 if result: 1615 return json.loads(result[0]) 1616 raise KeyError(f'Bad key: {key}') 1617 1618 def items(self) -> Tuple[str, Dict[str, Any]]: 1619 """Iterate over the key-value pairs in the SqliteDict.""" 1620 for key in self: 1621 value = self[key] 1622 yield key, value 1623