1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16"""Avatar runner."""
17
18import inspect
19import logging
20import os
21import pathlib
22
23from importlib.machinery import SourceFileLoader
24from mobly import base_test
25from mobly import config_parser
26from mobly import signals
27from mobly import test_runner
28from typing import Dict, List, Tuple, Type
29
30_BUMBLE_BTSNOOP_FMT = 'bumble_btsnoop_{pid}_{instance}.log'
31
32
33class SuiteRunner:
34    test_beds: List[str] = []
35    test_run_configs: List[config_parser.TestRunConfig] = []
36    test_classes: List[Type[base_test.BaseTestClass]] = []
37    test_filters: List[str] = []
38    logs_dir: pathlib.Path = pathlib.Path('out')
39    logs_verbose: bool = False
40
41    def set_logs_dir(self, path: pathlib.Path) -> None:
42        self.logs_dir = path
43
44    def set_logs_verbose(self, verbose: bool = True) -> None:
45        self.logs_verbose = verbose
46
47    def add_test_beds(self, test_beds: List[str]) -> None:
48        self.test_beds += test_beds
49
50    def add_test_filters(self, test_filters: List[str]) -> None:
51        self.test_filters += test_filters
52
53    def add_config_file(self, path: pathlib.Path) -> None:
54        self.test_run_configs += config_parser.load_test_config_file(str(path))  # type: ignore
55
56    def add_test_class(self, cls: Type[base_test.BaseTestClass]) -> None:
57        self.test_classes.append(cls)
58
59    def add_test_module(self, path: pathlib.Path) -> None:
60        try:
61            module = SourceFileLoader(path.stem, str(path)).load_module()
62            classes = inspect.getmembers(module, inspect.isclass)
63            for _, cls in classes:
64                if issubclass(cls, base_test.BaseTestClass):
65                    self.test_classes.append(cls)
66        except ImportError:
67            pass
68
69    def add_path(self, path: pathlib.Path, root: bool = True) -> None:
70        if path.is_file():
71            if path.name.endswith('_test.py'):
72                self.add_test_module(path)
73            elif not self.test_run_configs and not root and path.name in ('config.yml', 'config.yaml'):
74                self.add_config_file(path)
75            elif root:
76                raise ValueError(f'{path} is not a test file')
77        else:
78            for child in path.iterdir():
79                self.add_path(child, root=False)
80
81    def is_included(self, cls: base_test.BaseTestClass, test: str) -> bool:
82        return not self.test_filters or any(filter_match(cls, test, filter) for filter in self.test_filters)
83
84    @property
85    def included_tests(self) -> Dict[Type[base_test.BaseTestClass], Tuple[str, List[str]]]:
86        result: Dict[Type[base_test.BaseTestClass], Tuple[str, List[str]]] = {}
87        for test_class in self.test_classes:
88            cls = test_class(config_parser.TestRunConfig())
89            test_names: List[str] = []
90            try:
91                # Executes pre-setup procedures, this is required since it might
92                # generate test methods that we want to return as well.
93                cls._pre_run()
94                test_names = cls.tests or cls.get_existing_test_names()  # type: ignore
95                test_names = list(test for test in test_names if self.is_included(cls, test))
96                if test_names:
97                    assert cls.TAG
98                    result[test_class] = (cls.TAG, test_names)
99            except Exception:
100                logging.exception('Failed to retrieve generated tests.')
101            finally:
102                cls._clean_up()
103        return result
104
105    def run(self) -> bool:
106        # Create logs directory.
107        if not self.logs_dir.exists():
108            self.logs_dir.mkdir()
109
110        # Enable Bumble snoop logs.
111        os.environ.setdefault('BUMBLE_SNOOPER', f'btsnoop:file:{self.logs_dir}/{_BUMBLE_BTSNOOP_FMT}')
112
113        # Execute the suite
114        ok = True
115        for config in self.test_run_configs:
116            test_bed: str = config.test_bed_name  # type: ignore
117            if self.test_beds and test_bed not in self.test_beds:
118                continue
119            runner = test_runner.TestRunner(config.log_path, config.testbed_name)
120            with runner.mobly_logger(console_level=logging.DEBUG if self.logs_verbose else logging.INFO):
121                for test_class, (_, tests) in self.included_tests.items():
122                    runner.add_test_class(config, test_class, tests)  # type: ignore
123                try:
124                    runner.run()
125                    ok = ok and runner.results.is_all_pass
126                except signals.TestAbortAll:
127                    ok = ok and not self.test_beds
128                except Exception:
129                    logging.exception('Exception when executing %s.', config.testbed_name)
130                    ok = False
131        return ok
132
133
134def filter_match(cls: base_test.BaseTestClass, test: str, filter: str) -> bool:
135    tag: str = cls.TAG  # type: ignore
136    if '.test_' in filter:
137        return f"{tag}.{test}".startswith(filter)
138    if filter.startswith('test_'):
139        return test.startswith(filter)
140    return tag.startswith(filter)
141