1#
2# Copyright (C) 2020 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import os
18import logging
19import pkgutil
20from importlib import resources
21
22import ltp_configs
23import ltp_enums
24import test_case
25from configs import stable_tests
26from configs import disabled_tests
27from common import filter_utils
28from typing import Set, Optional, List, Callable
29
30ltp_test_template = '        <option name="test-command-line" key="%s" value="&ltp_env;; cd $LTPROOT; %s" />'
31
32class LtpTestCases(object):
33    """Load a ltp vts testcase definition file and parse it into a generator.
34
35    Attributes:
36        _data_path: string, the vts data path on host side
37        _filter_func: function, a filter method that will emit exception if a test is filtered
38        _ltp_tests_filter: list of string, filter for tests that are stable and disabled
39        _ltp_binaries: list of string, All ltp binaries that generate in build time
40        _ltp_config_lines: list of string: the context of the generated config
41    """
42
43    def __init__(self, filter_func: Callable):
44        self._filter_func = filter_func
45        self._ltp_tests_filter = filter_utils.Filter(
46            set(stable_tests.STABLE_TESTS.keys()),
47            disabled_tests.DISABLED_TESTS,
48            enable_regex=True)
49        self._ltp_tests_filter.ExpandBitness()
50        self._ltp_binaries = []
51        self._ltp_config_lines = []
52
53    def ValidateDefinition(self, line: str) -> Optional[List[str]]:
54        """Validate a tab delimited test case definition.
55
56        Will check whether the given line of definition has three parts
57        separated by tabs.
58        It will also trim leading and ending white spaces for each part
59        in returned tuple (if valid).
60
61        Returns:
62            A tuple in format (test suite, test name, test command) if
63            definition is valid. None otherwise.
64        """
65        items = [
66            item.strip()
67            for item in line.split(ltp_enums.Delimiters.TESTCASE_DEFINITION)
68        ]
69        if not len(items) == 3 or not items:
70            return None
71        else:
72            return items
73
74    def ReadConfigTemplateFile(self) -> str:
75        """Read the template of the config file and return the context.
76
77        Returns:
78            String.
79        """
80        # This gets bundled into the gen_ltp_config tool during the build
81        return pkgutil.get_data('template', 'template.xml').decode()
82
83    def GetKernelModuleControllerOption(self, arch: str, n_bit: int, is_low_mem: bool = False, is_hwasan: bool = False) -> str:
84        """Get the Option of KernelModuleController.
85
86        Args:
87            arch: String, arch
88            n_bit: int, bitness
89            is_low_mem: bool, whether to use low memory device configuration
90            is_hwasan: bool, whether to use hwasan configuration
91
92        Returns:
93            String.
94        """
95        arch_template = '        <option name="arch" value="{}"/>\n'
96        is_low_mem_template = '        <option name="is-low-mem" value="{}"/>\n'
97        is_hwasan_template = '        <option name="is-hwasan" value="{}"/>'
98        option_lines = arch_template + is_low_mem_template + is_hwasan_template
99        if n_bit == '64':
100            n_bit_string = str(n_bit) if arch == 'arm' or arch == 'riscv' else ('_'+str(n_bit))
101        else:
102            n_bit_string = ''
103        arch_name = arch + n_bit_string
104        is_low_mem = 'true' if is_low_mem else 'false'
105        is_hwasan = 'true' if is_hwasan else 'false'
106        option_lines = option_lines.format(arch_name,
107                                           str(is_low_mem).lower(),
108                                           str(is_hwasan).lower())
109        return option_lines
110
111    def GetLtpBinaries(self):
112        """Check the binary exist in the command.
113
114        Args:
115            command: String, the test command
116        """
117        for line in pkgutil.get_data('android.tools', 'gen.bp').decode().splitlines():
118            line = line.strip()
119            if not line or line.startswith('#'):
120                continue
121            if line.startswith("stem:") or line.startswith('filename:'):
122                ltp_binary = line.split('"')[1]
123                self._ltp_binaries.append(ltp_binary)
124
125    def IsLtpBinaryExist(self, commands: str) -> bool:
126        """Check the binary exist in the command.
127
128        Args:
129            command: String, the test command
130
131        Returns:
132            bool: True if the binary in the gen.bp
133        """
134        all_commands = commands.split(';')
135        for cmd in all_commands:
136            cmd = cmd.strip()
137            binary_name = cmd.split(' ')[0]
138            if binary_name in self._ltp_binaries:
139                return True
140        logging.info("Ltp binary not exist in cmd of '%s'", commands)
141        return False
142
143    def GenConfig(self,
144             arch: str,
145             n_bit: int,
146             test_filter: filter_utils.Filter,
147             output_file: str,
148             run_staging: bool = False,
149             is_low_mem: bool = False,
150             is_hwasan: bool = False):
151        """Read the definition file and generate the test config.
152
153        Args:
154            arch: String, arch
155            n_bit: int, bitness
156            test_filter: Filter object, test name filter from base_test
157            output_file: String, the file path of the generating config
158            run_staging: bool, whether to use staging configuration
159            is_low_mem: bool, whether to use low memory device configuration
160        """
161        self.GetLtpBinaries()
162        scenario_groups = (ltp_configs.TEST_SUITES_LOW_MEM
163                           if is_low_mem else ltp_configs.TEST_SUITES)
164        logging.info('LTP scenario groups: %s', scenario_groups)
165        start_append_test_keyword = 'option name="per-binary-timeout"'
166        config_lines = self.ReadConfigTemplateFile()
167        module_controller_option = self.GetKernelModuleControllerOption(arch, n_bit,
168                                                                        is_low_mem,
169                                                                        is_hwasan)
170        mandatory_test_cases = []
171        skippable_test_cases = []
172        run_script = self.GenerateLtpRunScript(scenario_groups)
173        for line in run_script:
174            items = self.ValidateDefinition(line)
175            if not items:
176                continue
177
178            testsuite, testname, command = items
179            if is_low_mem and testsuite.endswith(
180                    ltp_configs.LOW_MEMORY_SCENARIO_GROUP_SUFFIX):
181                testsuite = testsuite[:-len(
182                    ltp_configs.LOW_MEMORY_SCENARIO_GROUP_SUFFIX)]
183
184            # Tests failed to build will have prefix "DISABLED_"
185            if testname.startswith("DISABLED_"):
186                logging.info("[Parser] Skipping test case {}-{}. Reason: "
187                             "not built".format(testsuite, testname))
188                continue
189
190            # Some test cases have hardcoded "/tmp" in the command
191            # we replace that with ltp_configs.TMPDIR
192            command = command.replace('/tmp', ltp_configs.TMPDIR)
193
194            testcase = test_case.TestCase(
195                testsuite=testsuite, testname=testname, command=command)
196            test_display_name = "{}_{}bit".format(str(testcase), n_bit)
197
198            # Check runner's base_test filtering method
199            try:
200                self._filter_func(test_display_name)
201            except:
202                logging.info("[Parser] Skipping test case %s. Reason: "
203                             "filtered" % testcase.fullname)
204                testcase.is_filtered = True
205                testcase.note = "filtered"
206
207            logging.info('ltp_test_cases Load(): test_display_name = %s\n'
208                         'cmd = %s', test_display_name, command)
209
210            # For skipping tests that are not designed or ready for Android,
211            # check for bit specific test in disabled list as well as non-bit specific
212            if ((self._ltp_tests_filter.IsInExcludeFilter(str(testcase)) or
213                 self._ltp_tests_filter.IsInExcludeFilter(test_display_name)) and
214                    not test_filter.IsInIncludeFilter(test_display_name)):
215                logging.info("[Parser] Skipping test case %s. Reason: "
216                             "disabled" % testcase.fullname)
217                continue
218
219            # For separating staging tests from stable tests
220            if not self._ltp_tests_filter.IsInIncludeFilter(test_display_name):
221                if not run_staging and not test_filter.IsInIncludeFilter(
222                        test_display_name):
223                    # Skip staging tests in stable run
224                    continue
225                else:
226                    testcase.is_staging = True
227                    testcase.note = "staging"
228            else:
229                if run_staging:
230                    # Skip stable tests in staging run
231                    continue
232
233            if not testcase.is_staging:
234                if stable_tests.STABLE_TESTS.get(test_display_name, False):
235                    testcase.is_mandatory = True
236
237            if is_hwasan:
238                if test_display_name in disabled_tests.DISABLED_TESTS_HWASAN:
239                    continue
240
241            if self.IsLtpBinaryExist(command):
242                logging.info("[Parser] Adding test case %s." % testcase.fullname)
243                # Some test cases contain semicolons in their commands,
244                # and we replace them with &&
245                command = command.replace(';', '&amp;&amp;')
246                # Replace the original command with '/data/local/tmp/ltp'
247                # e.g. mm.mmapstress07
248                command = command.replace(ltp_configs.LTPDIR, '&ltp_dir;')
249                ltp_test_line = ltp_test_template % (test_display_name, command)
250                if testcase.is_mandatory:
251                    mandatory_test_cases.append(ltp_test_line)
252                else:
253                    skippable_test_cases.append(ltp_test_line)
254
255        module = 'vts_ltp_test'
256        if arch == 'x86' and n_bit == '64':
257            target = f'{arch}_{n_bit}'
258            module += f'_{arch}_{n_bit}'
259        elif n_bit == '32':
260            target = arch
261            module += f'_{arch}'
262        else:
263            target = f'{arch}{n_bit}'
264            module += f'_{arch}_{n_bit}'
265        if is_low_mem:
266            module += '_lowmem'
267        if is_hwasan:
268            module += '_hwasan'
269
270        config_lines = config_lines.format(
271            target=target,
272            module_controller_option=module_controller_option,
273            mandatory_test_cases='\n'.join(mandatory_test_cases),
274            skippable_test_cases='\n'.join(skippable_test_cases),
275            MODULE=module)
276        with open(output_file, 'w') as f:
277            f.write(config_lines)
278
279    def GenerateLtpTestCases(self, testsuite: str, disabled_tests_list: List[str]) -> List[str]:
280        '''Generate test cases for each ltp test suite.
281
282        Args:
283            testsuite: string, test suite name
284
285        Returns:
286            A list of string
287        '''
288        result = []
289        for line in pkgutil.get_data('runtest', testsuite).decode().splitlines():
290            line = line.strip()
291            if not line or line.startswith('#'):
292                continue
293
294            testname = line.split()[0]
295            testname_prefix = ('DISABLED_'
296                               if testname in disabled_tests_list else '')
297            testname_modified = testname_prefix + testname
298
299            result.append("\t".join(
300                [testsuite, testname_modified, line[len(testname):].strip()]))
301        return result
302
303    def GenerateLtpRunScript(self, scenario_groups: List[str]) -> List[str]:
304        '''Given a scenario group generate test case script.
305
306        Args:
307            scenario_groups: list of string, name of test scenario groups to use
308
309        Returns:
310            A list of string
311        '''
312        disabled_tests_list = pkgutil.get_data('android.tools', 'disabled_tests.txt').decode().splitlines()
313        disabled_tests_list = [line.strip() for line in disabled_tests_list]
314        disabled_tests_list = set(
315            line for line in disabled_tests_list
316            if line and not line.startswith('#'))
317
318        result = []
319        for testsuite in scenario_groups:
320            result.extend(
321                self.GenerateLtpTestCases(testsuite, disabled_tests_list))
322        return result
323