xref: /aosp_15_r20/external/autotest/server/site_tests/tast/tast_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/python3
2# -*- coding: utf-8 -*-
3# Copyright 2018 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7from __future__ import absolute_import
8from __future__ import division
9from __future__ import print_function
10
11import datetime
12import json
13import os
14import shutil
15import tempfile
16import unittest
17import six
18import yaml
19
20import dateutil.parser
21
22import common
23from autotest_lib.server.site_tests.tast import tast
24from autotest_lib.client.common_lib import base_job
25from autotest_lib.client.common_lib import error
26from autotest_lib.client.common_lib import utils
27from autotest_lib.server.cros.network import wifi_test_context_manager
28from autotest_lib.server.hosts import host_info
29from autotest_lib.server.hosts import servo_constants
30
31
32# Arbitrary base time to use in tests.
33BASE_TIME = dateutil.parser.parse('2018-01-01T00:00:00Z')
34
35# Arbitrary fixed time to use in place of time.time() when running tests.
36NOW = BASE_TIME + datetime.timedelta(0, 60)
37
38
39class TastTest(unittest.TestCase):
40    """Tests the tast.tast Autotest server test.
41
42    This unit test verifies interactions between the tast.py Autotest server
43    test and the 'tast' executable that's actually responsible for running
44    individual Tast tests and reporting their results. To do that, it sets up a
45    fake environment in which it can run the Autotest test, including a fake
46    implementation of the 'tast' executable provided by testdata/fake_tast.py.
47    """
48
49    # Arbitrary data to pass to the tast command.
50    HOST = 'dut.example.net'
51    PORT = 22
52    TEST_PATTERNS = ['(bvt)']
53    MAX_RUN_SEC = 300
54
55    # Default paths where Tast files are installed by Portage packages.
56    _PORTAGE_TAST_PATH = tast.tast._PORTAGE_TAST_PATH
57    _PORTAGE_REMOTE_BUNDLE_DIR = '/usr/libexec/tast/bundles/remote'
58    _PORTAGE_REMOTE_DATA_DIR = '/usr/share/tast/data'
59    _PORTAGE_REMOTE_TEST_RUNNER_PATH = '/usr/bin/remote_test_runner'
60
61    def setUp(self):
62        self._temp_dir = tempfile.mkdtemp('.tast_unittest')
63
64        def make_subdir(subdir):
65            # pylint: disable=missing-docstring
66            path = os.path.join(self._temp_dir, subdir)
67            os.mkdir(path)
68            return path
69
70        self._job = FakeServerJob(make_subdir('job'), make_subdir('tmp'))
71        self._bin_dir = make_subdir('bin')
72        self._out_dir = make_subdir('out')
73        self._root_dir = make_subdir('root')
74        self._set_up_root()
75
76        self._test = tast.tast(self._job, self._bin_dir, self._out_dir)
77        self._host = FakeHost(self.HOST, self.PORT)
78
79        self._test_patterns = []
80        self._tast_commands = {}
81
82    def tearDown(self):
83        shutil.rmtree(self._temp_dir)
84
85    def _get_path_in_root(self, orig_path):
86        """Appends a path to self._root_dir (which stores Tast-related files).
87
88        @param orig_path: Path to append, e.g. '/usr/bin/tast'.
89        @return: Path within the root dir, e.g. '/path/to/root/usr/bin/tast'.
90        """
91        return os.path.join(self._root_dir, os.path.relpath(orig_path, '/'))
92
93    def _set_up_root(self, ssp=False):
94        """Creates Tast-related files and dirs within self._root_dir.
95
96        @param ssp: If True, install files to locations used with Server-Side
97            Packaging. Otherwise, install to locations used by Portage packages.
98        """
99        def create_file(orig_dest, src=None):
100            """Creates a file under self._root_dir.
101
102            @param orig_dest: Original absolute path, e.g. "/usr/bin/tast".
103            @param src: Absolute path to file to copy, or none to create empty.
104            @return: Absolute path to created file.
105            """
106            dest = self._get_path_in_root(orig_dest)
107            if not os.path.exists(os.path.dirname(dest)):
108                os.makedirs(os.path.dirname(dest))
109            if src:
110                shutil.copyfile(src, dest)
111                shutil.copymode(src, dest)
112            else:
113                open(dest, 'a').close()
114            return dest
115
116        # Copy fake_tast.py to the usual location for the 'tast' executable.
117        # The remote bundle dir and remote_test_runner just need to exist so
118        # tast.py can find them; their contents don't matter since fake_tast.py
119        # won't actually use them.
120        self._tast_path = create_file(
121                tast.tast._SSP_TAST_PATH if ssp else self._PORTAGE_TAST_PATH,
122                os.path.join(os.path.dirname(os.path.realpath(__file__)),
123                             'testdata', 'fake_tast.py'))
124        self._remote_bundle_dir = os.path.dirname(
125                create_file(os.path.join(tast.tast._SSP_REMOTE_BUNDLE_DIR if ssp
126                                         else self._PORTAGE_REMOTE_BUNDLE_DIR,
127                                         'fake')))
128        self._remote_data_dir = os.path.dirname(
129                create_file(os.path.join(tast.tast._SSP_REMOTE_DATA_DIR if ssp
130                                         else self._PORTAGE_REMOTE_DATA_DIR,
131                                         'fake')))
132        self._remote_test_runner_path = create_file(
133                tast.tast._SSP_REMOTE_TEST_RUNNER_PATH if ssp
134                else self._PORTAGE_REMOTE_TEST_RUNNER_PATH)
135
136    def _init_tast_commands(self,
137                            tests,
138                            ssp=False,
139                            build=False,
140                            build_bundle='fakebundle',
141                            run_private_tests=False,
142                            run_vars=[],
143                            run_varsfiles=[],
144                            download_data_lazily=False,
145                            totalshards=1,
146                            shardindex=0,
147                            companion_duts={},
148                            maybemissingvars='',
149                            port=True,
150                            test_filter_files=[]):
151        """Sets fake_tast.py's behavior for 'list' and 'run' commands.
152
153        @param tests: List of TestInfo objects.
154        @param run_private_tests: Whether to run private tests.
155        @param run_vars: List of string values that should be passed to 'run'
156            via -var.
157        @param run_varsfiles: filenames should be passed to 'run' via -varsfile.
158        @param download_data_lazily: Whether to download external data files
159            lazily.
160        @param totalshards: total number of shards.
161        @param shardindex: shard index to be run.
162        @param companion_duts: mapping between roles and DUTs.
163        @param test_filter_files: a list of files specify which tests to disable.
164        """
165        list_args = [
166                'build=%s' % build,
167                'patterns=%s' % self.TEST_PATTERNS,
168                'sshretries=%d' % tast.tast._SSH_CONNECT_RETRIES,
169                'downloaddata=%s' %
170                ('lazy' if download_data_lazily else 'batch'),
171                'totalshards=%d' % totalshards,
172                'shardindex=%d' % shardindex,
173                'target=%s%s' % (self.HOST, ':%d' % self.PORT if port else ''),
174                'verbose=True',
175        ]
176        if build:
177            list_args.extend([
178                'buildbundle=%s' % build_bundle,
179                'checkbuilddeps=False',
180            ])
181        else:
182            if ssp:
183                list_args.extend([
184                    'remotebundledir=%s' % self._remote_bundle_dir,
185                    'remotedatadir=%s' % self._remote_data_dir,
186                    'remoterunner=%s' % self._remote_test_runner_path,
187                ])
188            else:
189                list_args.extend([
190                    'remotebundledir=None',
191                    'remotedatadir=None',
192                    'remoterunner=None',
193                ])
194            list_args.append('downloadprivatebundles=%s' % run_private_tests)
195        run_args = list_args + [
196            'resultsdir=%s' % self._test.resultsdir,
197            'continueafterfailure=True',
198            'var=%s' % run_vars,
199        ]
200        if run_varsfiles:
201            run_args.append('varsfile=%s' % run_varsfiles)
202        if companion_duts:
203            role_dut_pairs = []
204            for role, dut in sorted(companion_duts.items()):
205                role_dut_pairs.append('%s:%s%s' %
206                                      (role, dut.hostname,
207                                       ':%d' % dut.port if dut.port else ''))
208            run_args.append('companiondut=%s' % role_dut_pairs)
209        if test_filter_files:
210            run_args.append('testfilterfile=%s' % test_filter_files)
211        test_list = json.dumps([t.test() for t in tests])
212        run_files = {
213            self._results_path(): ''.join(
214                    [json.dumps(t.test_result()) + '\n'
215                     for t in tests if t.start_time()]),
216        }
217        self._tast_commands = {
218            'list': TastCommand(list_args, stdout=test_list),
219            'run': TastCommand(run_args, files_to_write=run_files),
220        }
221
222    def _results_path(self):
223        """Returns the path where "tast run" writes streamed results.
224
225        @return Path to streamed results file.
226        """
227        return os.path.join(self._test.resultsdir,
228                            tast.tast._STREAMED_RESULTS_FILENAME)
229
230    def _run_test(self,
231                  ignore_test_failures=False,
232                  command_args=[],
233                  ssp=False,
234                  build=False,
235                  build_bundle='fakebundle',
236                  run_private_tests=False,
237                  varsfiles=[],
238                  download_data_lazily=False,
239                  clear_tpm=False,
240                  totalshards=1,
241                  shardindex=0,
242                  companion_duts={},
243                  varslist=[],
244                  maybemissingvars='',
245                  use_camera_box=False,
246                  vars_gs_path='',
247                  test_filter_files=[],
248                  report_skipped=False):
249        """Writes fake_tast.py's configuration and runs the test.
250
251        @param ignore_test_failures: Passed as the identically-named arg to
252            Tast.initialize().
253        @param command_args: Passed as the identically-named arg to
254            Tast.initialize().
255        @param ssp: Passed as the identically-named arg to Tast.initialize().
256        @param build: Passed as the identically-named arg to Tast.initialize().
257        @param build_bundle: Passed as the identically-named arg to
258            Tast.initialize().
259        @param run_private_tests: Passed as the identically-named arg to
260            Tast.initialize().
261        @param varsfiles: list of names of yaml files containing variables set
262             in |-varsfile| arguments.
263        @param download_data_lazily: Whether to download external data files
264            lazily.
265        @param clear_tpm: clear the TPM first before running the tast tests.
266        @param varslist: list of strings to pass to tast run command as |-vars|
267            arguments. Each string should be formatted as "name=value".
268        @param maybemissingvars: a regex to pass to tast run command as
269            |-maybemissingvars| arguments.
270        @param use_camera_box: Whether the test run in CameraBox.
271        @param report_skipped: Whether or not skipped tests should be reported.
272        """
273        self._test.initialize(self._host,
274                              self.TEST_PATTERNS,
275                              ignore_test_failures=ignore_test_failures,
276                              max_run_sec=self.MAX_RUN_SEC,
277                              command_args=command_args,
278                              install_root=self._root_dir,
279                              ssp=ssp,
280                              build=build,
281                              build_bundle=build_bundle,
282                              run_private_tests=run_private_tests,
283                              varsfiles=varsfiles,
284                              download_data_lazily=download_data_lazily,
285                              clear_tpm=clear_tpm,
286                              totalshards=totalshards,
287                              shardindex=shardindex,
288                              companion_duts=companion_duts,
289                              varslist=varslist,
290                              maybemissingvars=maybemissingvars,
291                              use_camera_box=use_camera_box,
292                              vars_gs_path=vars_gs_path,
293                              test_filter_files=test_filter_files,
294                              report_skipped=report_skipped)
295        self._test.set_fake_now_for_testing(
296                (NOW - tast._UNIX_EPOCH).total_seconds())
297
298        cfg = {}
299        for name, cmd in six.iteritems(self._tast_commands):
300            cfg[name] = vars(cmd)
301        path = os.path.join(os.path.dirname(self._tast_path), 'config.json')
302        with open(path, 'a') as f:
303            json.dump(cfg, f)
304
305        try:
306            self._test.run_once()
307        finally:
308            if self._job.post_run_hook:
309                self._job.post_run_hook()
310
311    def _run_test_for_failure(self, failed, missing):
312        """Calls _run_test and checks the resulting failure message.
313
314        @param failed: List of TestInfo objects for expected-to-fail tests.
315        @param missing: List of TestInfo objects for expected-missing tests.
316        """
317        with self.assertRaises(error.TestFail) as cm:
318            self._run_test()
319
320        msg = self._test._get_failure_message([t.name() for t in failed],
321                                              [t.name() for t in missing])
322        self.assertEqual(msg, str(cm.exception))
323
324    def _load_job_keyvals(self):
325        """Loads job keyvals.
326
327        @return Keyvals as a str-to-str dict, or None if keyval file is missing.
328        """
329        if not os.path.exists(os.path.join(self._job.resultdir,
330                                           'keyval')):
331            return None
332        return utils.read_keyval(self._job.resultdir)
333
334    def testPassingTests(self):
335        """Tests that passing tests are reported correctly."""
336        tests = [TestInfo('pkg.Test1', 0, 2),
337                 TestInfo('pkg.Test2', 3, 5),
338                 TestInfo('pkg.Test3', 6, 8)]
339        self._init_tast_commands(tests)
340        self._run_test()
341        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
342                         status_string(self._job.status_entries))
343        self.assertIs(self._load_job_keyvals(), None)
344
345    def testPassingTestsNoPort(self):
346        """Tests that passing tests are reported correctly."""
347        self._host = FakeHost(self.HOST, None)
348        tests = [
349                TestInfo('pkg.Test1', 0, 2),
350                TestInfo('pkg.Test2', 3, 5),
351                TestInfo('pkg.Test3', 6, 8)
352        ]
353        self._init_tast_commands(tests, port=None)
354        self._run_test()
355        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
356                         status_string(self._job.status_entries))
357        self.assertIs(self._load_job_keyvals(), None)
358
359    def testFailingTests(self):
360        """Tests that failing tests are reported correctly."""
361        tests = [TestInfo('pkg.Test1', 0, 2, errors=[('failed', 1)]),
362                 TestInfo('pkg.Test2', 3, 6),
363                 TestInfo('pkg.Test3', 7, 8, errors=[('another', 7)])]
364        self._init_tast_commands(tests)
365        self._run_test_for_failure([tests[0], tests[2]], [])
366        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
367                         status_string(self._job.status_entries))
368        self.assertIs(self._load_job_keyvals(), None)
369
370    def testIgnoreTestFailures(self):
371        """Tests that tast.tast can still pass with Tast test failures."""
372        tests = [TestInfo('pkg.Test', 0, 2, errors=[('failed', 1)])]
373        self._init_tast_commands(tests)
374        self._run_test(ignore_test_failures=True)
375        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
376                         status_string(self._job.status_entries))
377
378    def testSkippedTest(self):
379        """Tests that skipped tests aren't reported."""
380        tests = [
381                TestInfo('pkg.Normal', 0, 1),
382                TestInfo('pkg.Skipped', 2, 2, skip_reason='missing deps')
383        ]
384        self._init_tast_commands(tests)
385        self._run_test()
386        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
387                         status_string(self._job.status_entries))
388        self.assertIs(self._load_job_keyvals(), None)
389
390    def testSkippedTestWithReportSkipped(self):
391        """Tests that skipped tests are reported correctly when report_skipped=True."""
392        tests = [
393                TestInfo('pkg.Normal', 0, 1),
394                TestInfo('pkg.Skipped',
395                         2,
396                         3,
397                         skip_reason='missing deps',
398                         report_skipped=True)
399        ]
400        self._init_tast_commands(tests)
401        self._run_test(report_skipped=True)
402        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
403                         status_string(self._job.status_entries))
404        self.assertIs(self._load_job_keyvals(), None)
405
406    def testSkippedTestWithErrors(self):
407        """Tests that skipped tests are reported if they also report errors."""
408        tests = [TestInfo('pkg.Normal', 0, 1),
409                 TestInfo('pkg.SkippedWithErrors', 2, 2, skip_reason='bad deps',
410                          errors=[('bad deps', 2)])]
411        self._init_tast_commands(tests)
412        self._run_test_for_failure([tests[1]], [])
413        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
414                         status_string(self._job.status_entries))
415        self.assertIs(self._load_job_keyvals(), None)
416
417    def testMissingTests(self):
418        """Tests that missing tests are reported when there's another test."""
419        tests = [TestInfo('pkg.Test1', None, None),
420                 TestInfo('pkg.Test2', 0, 2),
421                 TestInfo('pkg.Test3', None, None)]
422        self._init_tast_commands(tests)
423        self._run_test_for_failure([], [tests[0], tests[2]])
424        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
425                         status_string(self._job.status_entries))
426        self.assertEqual(self._load_job_keyvals(),
427                         {'tast_missing_test.0': 'pkg.Test1',
428                          'tast_missing_test.1': 'pkg.Test3'})
429
430    def testNoTestsRun(self):
431        """Tests that a missing test is reported when it's the only test."""
432        tests = [TestInfo('pkg.Test', None, None)]
433        self._init_tast_commands(tests)
434        self._run_test_for_failure([], [tests[0]])
435        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
436                         status_string(self._job.status_entries))
437        self.assertEqual(self._load_job_keyvals(),
438                         {'tast_missing_test.0': 'pkg.Test'})
439
440    def testHangingTest(self):
441        """Tests that a not-finished test is reported."""
442        tests = [TestInfo('pkg.Test1', 0, 2),
443                 TestInfo('pkg.Test2', 3, None),
444                 TestInfo('pkg.Test3', None, None)]
445        self._init_tast_commands(tests)
446        self._run_test_for_failure([tests[1]], [tests[2]])
447        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
448                         status_string(self._job.status_entries))
449        self.assertEqual(self._load_job_keyvals(),
450                         {'tast_missing_test.0': 'pkg.Test3'})
451
452    def testRunError(self):
453        """Tests that a run error is reported for a non-finished test."""
454        tests = [TestInfo('pkg.Test1', 0, 2),
455                 TestInfo('pkg.Test2', 3, None),
456                 TestInfo('pkg.Test3', None, None)]
457        self._init_tast_commands(tests)
458
459        # Simulate the run being aborted due to a lost SSH connection.
460        path = os.path.join(self._test.resultsdir,
461                            tast.tast._RUN_ERROR_FILENAME)
462        msg = 'Lost SSH connection to DUT'
463        self._tast_commands['run'].files_to_write[path] = msg
464        self._tast_commands['run'].status = 1
465
466        self._run_test_for_failure([tests[1]], [tests[2]])
467        self.assertEqual(
468                status_string(get_status_entries_from_tests(tests, msg)),
469                status_string(self._job.status_entries))
470        self.assertEqual(self._load_job_keyvals(),
471                         {'tast_missing_test.0': 'pkg.Test3'})
472
473    def testNoTestsMatched(self):
474        """Tests that no error is raised if no tests are matched."""
475        self._init_tast_commands([])
476        self._run_test()
477
478    def testListCommandFails(self):
479        """Tests that an error is raised if the list command fails."""
480        self._init_tast_commands([])
481
482        # The list subcommand writes log messages to stderr on failure.
483        FAILURE_MSG = "failed to connect"
484        self._tast_commands['list'].status = 1
485        self._tast_commands['list'].stdout = None
486        self._tast_commands['list'].stderr = 'blah blah\n%s\n' % FAILURE_MSG
487
488        # The first line of the exception should include the last line of output
489        # from tast.
490        with self.assertRaises(error.TestFail) as cm:
491            self._run_test()
492        first_line = str(cm.exception).split('\n')[0]
493        self.assertTrue(FAILURE_MSG in first_line,
494                        '"%s" not in "%s"' % (FAILURE_MSG, first_line))
495
496    def testListCommandPrintsGarbage(self):
497        """Tests that an error is raised if the list command prints bad data."""
498        self._init_tast_commands([])
499        self._tast_commands['list'].stdout = 'not valid JSON data'
500        with self.assertRaises(error.TestFail) as _:
501            self._run_test()
502
503    def testRunCommandFails(self):
504        """Tests that an error is raised if the run command fails."""
505        tests = [TestInfo('pkg.Test1', 0, 1), TestInfo('pkg.Test2', 2, 3)]
506        self._init_tast_commands(tests)
507        FAILURE_MSG = "this is the failure"
508        self._tast_commands['run'].status = 1
509        self._tast_commands['run'].stdout = 'blah blah\n%s\n' % FAILURE_MSG
510
511        with self.assertRaises(error.TestFail) as cm:
512            self._run_test()
513        self.assertTrue(FAILURE_MSG in str(cm.exception),
514                        '"%s" not in "%s"' % (FAILURE_MSG, str(cm.exception)))
515        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
516                         status_string(self._job.status_entries))
517
518    def testRunCommandWritesTrailingGarbage(self):
519        """Tests that an error is raised if the run command prints bad data."""
520        tests = [TestInfo('pkg.Test1', 0, 1), TestInfo('pkg.Test2', None, None)]
521        self._init_tast_commands(tests)
522        self._tast_commands['run'].files_to_write[self._results_path()] += \
523                'not valid JSON data'
524        with self.assertRaises(error.TestFail) as _:
525            self._run_test()
526        # Missing tests are reported in the _parse_results which is called after
527        # _run_tests, so missing tests is not available and the status_entries
528        # should include only the first test case.
529        self.assertEqual(
530                status_string(get_status_entries_from_tests(tests[:1])),
531                status_string(self._job.status_entries))
532
533    def testRunCommandWithSharding(self):
534        """Tests that sharding parameter is passing thru without issues."""
535        tests = [TestInfo('pkg.Test1', 0, 2), TestInfo('pkg.Test2', 3, 5)]
536        self._init_tast_commands(tests=tests, totalshards=2, shardindex=1)
537        self._run_test(totalshards=2, shardindex=1)
538
539    def testRunCommandWithCompanionDUTs(self):
540        """Tests that companion dut parameter is passing thru without issues."""
541        tests = [TestInfo('pkg.Test1', 0, 2), TestInfo('pkg.Test2', 3, 5)]
542        companion_duts = {'role1': FakeHost('dut1', 22), 'role2':FakeHost('dut2', 22)}
543        self._init_tast_commands(tests=tests, companion_duts=companion_duts)
544        self._run_test(companion_duts=companion_duts)
545
546    def testRunCommandWithCompanionDUTsNoPort(self):
547        """Tests that companion dut parameter is passing thru without issues."""
548        tests = [TestInfo('pkg.Test1', 0, 2), TestInfo('pkg.Test2', 3, 5)]
549        companion_duts = {
550                'role1': FakeHost('dut1', 22),
551                'role2': FakeHost('dut2', None)
552        }
553        self._init_tast_commands(tests=tests, companion_duts=companion_duts)
554        self._run_test(companion_duts=companion_duts)
555
556    def testRunCommandWithTestFilterFiles(self):
557        """Tests that companion dut parameter is passing thru without issues."""
558        tests = [TestInfo('pkg.Test1', 0, 2), TestInfo('pkg.Test2', 3, 5)]
559        test_filter_files = ['filter_1.txt', 'filter_2.txt']
560        self._init_tast_commands(tests=tests,
561                                 test_filter_files=test_filter_files)
562        self._run_test(test_filter_files=test_filter_files)
563
564    def testNoResultsFile(self):
565        """Tests that an error is raised if no results file is written."""
566        tests = [TestInfo('pkg.Test1', None, None)]
567        self._init_tast_commands(tests)
568        self._tast_commands['run'].files_to_write = {}
569        with self.assertRaises(error.TestFail) as _:
570            self._run_test()
571        # Missing tests are reported in the _parse_results which is called after
572        # _run_tests, so missing tests is not available and the status_entries
573        # should be empty.
574        self.assertEqual(status_string(get_status_entries_from_tests([])),
575                         status_string(self._job.status_entries))
576
577    def testNoResultsFileAfterRunCommandFails(self):
578        """Tests that stdout is included in error after missing results."""
579        tests = [TestInfo('pkg.Test1', None, None)]
580        self._init_tast_commands(tests)
581        FAILURE_MSG = "this is the failure"
582        self._tast_commands['run'].status = 1
583        self._tast_commands['run'].files_to_write = {}
584        self._tast_commands['run'].stdout = 'blah blah\n%s\n' % FAILURE_MSG
585
586        # The first line of the exception should include the last line of output
587        # from tast rather than a message about the missing results file.
588        with self.assertRaises(error.TestFail) as cm:
589            self._run_test()
590        first_line = str(cm.exception).split('\n')[0]
591        self.assertTrue(FAILURE_MSG in first_line,
592                        '"%s" not in "%s"' % (FAILURE_MSG, first_line))
593        # Missing tests are reported in the _parse_results which is called after
594        # _run_tests, so missing tests is not available and the status_entries
595        # should be empty.
596        self.assertEqual(status_string(get_status_entries_from_tests([])),
597                         status_string(self._job.status_entries))
598
599    def testMissingTastExecutable(self):
600        """Tests that an error is raised if the tast command isn't found."""
601        os.remove(self._get_path_in_root(self._PORTAGE_TAST_PATH))
602        with self.assertRaises(error.TestFail) as _:
603            self._run_test()
604
605    def testMissingRemoteTestRunner(self):
606        """Tests that an error is raised if remote_test_runner isn't found."""
607        os.remove(self._get_path_in_root(self._PORTAGE_REMOTE_TEST_RUNNER_PATH))
608        with self.assertRaises(error.TestFail) as _:
609            self._run_test()
610
611    def testMissingRemoteBundleDir(self):
612        """Tests that an error is raised if remote bundles aren't found."""
613        shutil.rmtree(self._get_path_in_root(self._PORTAGE_REMOTE_BUNDLE_DIR))
614        with self.assertRaises(error.TestFail) as _:
615            self._run_test()
616
617    def testSspPaths(self):
618        """Tests that files can be located at their alternate SSP locations."""
619        for p in os.listdir(self._root_dir):
620            shutil.rmtree(os.path.join(self._root_dir, p))
621        self._set_up_root(ssp=True)
622
623        tests = [TestInfo('pkg.Test', 0, 1)]
624        self._init_tast_commands(tests, ssp=True)
625        self._run_test(ssp=True)
626        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
627                         status_string(self._job.status_entries))
628
629    def testBuild(self):
630        """Tests that Tast tests can be built."""
631        tests = [TestInfo('pkg.Test', 0, 1)]
632        self._init_tast_commands(tests, build=True)
633        self._run_test(build=True)
634        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
635                         status_string(self._job.status_entries))
636
637    def testFailureMessage(self):
638        """Tests that appropriate failure messages are generated."""
639        # Just do this to initialize the self._test.
640        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)])
641        self._run_test()
642
643        msg = lambda f, m: self._test._get_failure_message(f, m)
644        self.assertEqual('', msg([], []))
645        self.assertEqual('1 failed: t1', msg(['t1'], []))
646        self.assertEqual('2 failed: t1 t2', msg(['t1', 't2'], []))
647        self.assertEqual('1 missing: t1', msg([], ['t1']))
648        self.assertEqual('1 failed: t1; 1 missing: t2', msg(['t1'], ['t2']))
649
650    def testFailureMessageIgnoreTestFailures(self):
651        """Tests that test failures are ignored in messages when requested."""
652        # Just do this to initialize the self._test.
653        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)])
654        self._run_test(ignore_test_failures=True)
655
656        msg = lambda f, m: self._test._get_failure_message(f, m)
657        self.assertEqual('', msg([], []))
658        self.assertEqual('', msg(['t1'], []))
659        self.assertEqual('1 missing: t1', msg([], ['t1']))
660        self.assertEqual('1 missing: t2', msg(['t1'], ['t2']))
661
662    def testNonAsciiFailureMessage(self):
663        """Tests that non-ascii failure message should be handled correctly"""
664        tests = [TestInfo('pkg.Test', 0, 2, errors=[('失敗', 1)])]
665        self._init_tast_commands(tests)
666        self._run_test(ignore_test_failures=True)
667        self.assertEqual(status_string(get_status_entries_from_tests(tests)),
668                         status_string(self._job.status_entries))
669
670    def testRunPrivateTests(self):
671        """Tests running private tests."""
672        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
673                                 run_private_tests=True)
674        self._run_test(ignore_test_failures=True, run_private_tests=True)
675
676    def testDownloadDataLazily(self):
677        """Tests downloading external data files lazily."""
678        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
679                                 download_data_lazily=True)
680        self._run_test(ignore_test_failures=True, download_data_lazily=True)
681
682    def testServoFromCommandArgs(self):
683        """Tests passing servo info via command-line arg."""
684        SERVO_HOST = 'chromeos6-row2-rack21-labstation1.cros'
685        SERVO_PORT = '9995'
686
687        servo_var = 'servo=%s:%s' % (SERVO_HOST, SERVO_PORT)
688        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
689                                 run_vars=[servo_var])
690
691        # Simulate servo info being passed on the command line via --args.
692        args = [
693            '%s=%s' % (servo_constants.SERVO_HOST_ATTR, SERVO_HOST),
694            '%s=%s' % (servo_constants.SERVO_PORT_ATTR, SERVO_PORT),
695        ]
696        self._run_test(command_args=args)
697
698    def testServoFromHostInfoStore(self):
699        """Tests getting servo info from the host."""
700        SERVO_HOST = 'chromeos6-row2-rack21-labstation1.cros'
701        SERVO_PORT = '9995'
702
703        servo_var = 'servo=%s:%s' % (SERVO_HOST, SERVO_PORT)
704        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
705                                 run_vars=[servo_var])
706
707        # Simulate the host's servo info being stored in the Autotest DB.
708        attr = {
709            servo_constants.SERVO_HOST_ATTR: SERVO_HOST,
710            servo_constants.SERVO_PORT_ATTR: SERVO_PORT,
711        }
712        self._host.host_info_store.commit(host_info.HostInfo(attributes=attr))
713        self._run_test()
714
715    def testWificellArgs(self):
716        """Tests passing Wificell specific args into Tast runner."""
717        ROUTER_IP = '192.168.1.2:1234'
718        PCAP_IP = '192.168.1.3:2345'
719        wificell_var = [
720                'router=%s' % ROUTER_IP,
721                'pcap=%s' % PCAP_IP,
722                'routers=%s,%s' % (ROUTER_IP, PCAP_IP),
723        ]
724        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
725                                 run_vars=wificell_var)
726
727        WiFiManager = wifi_test_context_manager.WiFiTestContextManager
728        arg_list = [
729            (WiFiManager.CMDLINE_ROUTER_ADDR, ROUTER_IP),
730            (WiFiManager.CMDLINE_PCAP_ADDR, PCAP_IP),
731        ]
732        args = [("%s=%s" % x) for x in arg_list]
733        self._run_test(command_args=args)
734
735    def testFirmwareArgs(self):
736        """Tests passing firmware specific args into Tast runner."""
737        vars = ['firmware.no_ec_sync=true']
738        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], run_vars=vars)
739
740        args = ['no_ec_sync=true']
741        self._run_test(command_args=args)
742
743    def testCameraboxArgs(self):
744        """Tests passing camerabox specific args into Tast runner."""
745        # Now it won't specify any chart IP address if it does not find a valid
746        # one.
747        vars = []
748        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], run_vars=vars)
749        self._run_test(use_camera_box=True)
750
751    def testVarsfileOption(self):
752        with tempfile.NamedTemporaryFile(
753                suffix='.yaml', dir=self._temp_dir) as temp_file:
754            yaml.dump({
755                    "var1": "val1",
756                    "var2": "val2"
757            },
758                      stream=temp_file,
759                      encoding='utf-8')
760            varsfiles = [temp_file.name]
761            self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
762                                     run_varsfiles=varsfiles)
763            self._run_test(varsfiles=varsfiles)
764
765    def testVarslistOption(self):
766        varslist = ["var1=val1", "var2=val2"]
767        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
768                                 run_vars=varslist)
769        self._run_test(varslist=varslist)
770
771    def testMaybeMissingVarsOption(self):
772        arg = '.*\.Test'
773        self._init_tast_commands([TestInfo('pkg.Test', 0, 0)],
774                                 maybemissingvars=arg)
775        self._run_test(maybemissingvars=arg)
776
777    def testFillExtvars(self):
778        with tempfile.NamedTemporaryFile(suffix='.yaml',
779                                         dir=self._temp_dir) as temp_file:
780            yaml.dump({
781                    'var1': 'val1',
782                    'var2': 'val2'
783            },
784                      stream=temp_file,
785                      encoding='utf-8')
786
787            host = FakeHost(self.HOST, self.PORT)
788            host.host_info_store = host_info.InMemoryHostInfoStore(
789                    host_info.HostInfo(labels=[
790                            'plus', 'board:octopus', 'fleex', 'os:cros'
791                    ]))
792            self._test.initialize(
793                    host=host,
794                    test_exprs=self.TEST_PATTERNS,
795                    varsfiles=[temp_file.name],
796                    varslist=['var3=val3', 'var4=val4'],
797                    command_args=['arg1', 'arg2=arg2val', 'arg3:arg3val'])
798            self._test._tests_to_run = ['test1', 'test2']
799
800            self.maxDiff = None
801            self.assertDictEqual(
802                    {
803                            'var:var1': 'val1',
804                            'var:var2': 'val2',
805                            'var:var3': 'val3',
806                            'var:var4': 'val4',
807                            'tests:': 'test1\ntest2',
808                            'test:test1': '',
809                            'test:test2': '',
810                            'args:': 'arg1\narg2=arg2val\narg3:arg3val',
811                            'arg:arg1': 'arg1val',
812                            'arg:arg1': '',
813                            'arg:arg2': 'arg2val',
814                            'arg:arg3': 'arg3val',
815                            'labels:': 'plus\nboard:octopus\nfleex\nos:cros',
816                            'label:plus': '',
817                            'label:board': 'octopus',
818                            'label:fleex': '',
819                            'label:os': 'cros',
820                    }, self._test._fill_config_extvars())
821
822    def test_dut_server_arg(self):
823        # Test positive single case
824        cargs = [
825                'dut_servers=100.101.102.103:1111,', 'servo_host=localhost',
826                'servo_port=9999'
827        ]
828        dut_server = tast._dut_server_arg(cargs)
829        self.assertEqual(dut_server, '100.101.102.103:1111')
830
831        # Test no dut_servers provided case:
832        cargs = ['servo_host=localhost', 'servo_port=9999']
833        dut_server = tast._dut_server_arg(cargs)
834        self.assertEqual(dut_server, None)
835
836        # Test multiple dut_servers provided case:
837        cargs = [
838                'dut_servers=100.101.102.103:1111,localhost:1234',
839                'servo_host=localhost', 'servo_port=9999'
840        ]
841        dut_server = tast._dut_server_arg(cargs)
842        self.assertEqual(dut_server, '100.101.102.103:1111')
843
844
845class TestInfo:
846    """Wraps information about a Tast test.
847
848    This struct is used to:
849    - get test definitions printed by fake_tast.py's 'list' command
850    - get test results written by fake_tast.py's 'run' command
851    - get expected base_job.status_log_entry objects that unit tests compare
852      against what tast.Tast actually recorded
853    """
854
855    def __init__(self,
856                 name,
857                 start_offset,
858                 end_offset,
859                 errors=None,
860                 skip_reason=None,
861                 attr=None,
862                 timeout_ns=0,
863                 missing_reason=None,
864                 report_skipped=False):
865        """
866        @param name: Name of the test, e.g. 'ui.ChromeLogin'.
867        @param start_offset: Start time as int seconds offset from BASE_TIME,
868            or None to indicate that tast didn't report a result for this test.
869        @param end_offset: End time as int seconds offset from BASE_TIME, or
870            None to indicate that tast reported that this test started but not
871            that it finished.
872        @param errors: List of (string, int) tuples containing reasons and
873            seconds offsets of errors encountered while running the test, or
874            None if no errors were encountered.
875        @param skip_reason: Human-readable reason that the test was skipped, or
876            None to indicate that it wasn't skipped.
877        @param attr: List of string test attributes assigned to the test, or
878            None if no attributes are assigned.
879        @param timeout_ns: Test timeout in nanoseconds.
880        @param report_skipped: Decide if skipped tests should be reported
881        """
882        def from_offset(offset):
883            """Returns an offset from BASE_TIME.
884
885            @param offset: Offset as integer seconds.
886            @return: datetime.datetime object.
887            """
888            if offset is None:
889                return None
890            return BASE_TIME + datetime.timedelta(seconds=offset)
891
892        self._name = name
893        self._start_time = from_offset(start_offset)
894        self._end_time = from_offset(end_offset)
895        self._errors = (
896                [(e[0], from_offset(e[1])) for e in errors] if errors else [])
897        self._skip_reason = skip_reason
898        self._attr = list(attr) if attr else []
899        self._timeout_ns = timeout_ns
900        self._report_skipped = report_skipped
901
902    def name(self):
903        # pylint: disable=missing-docstring
904        return self._name
905
906    def start_time(self):
907        # pylint: disable=missing-docstring
908        return self._start_time
909
910    def test(self):
911        """Returns a test dict printed by the 'list' command.
912
913        @return: dict representing a Tast testing.Test struct.
914        """
915        return {
916            'name': self._name,
917            'attr': self._attr,
918            'timeout': self._timeout_ns,
919        }
920
921    def test_result(self):
922        """Returns a dict representing a result written by the 'run' command.
923
924        @return: dict representing a Tast TestResult struct.
925        """
926        return {
927            'name': self._name,
928            'start': to_rfc3339(self._start_time),
929            'end': to_rfc3339(self._end_time),
930            'errors': [{'reason': e[0], 'time': to_rfc3339(e[1])}
931                       for e in self._errors],
932            'skipReason': self._skip_reason,
933            'attr': self._attr,
934            'timeout': self._timeout_ns,
935        }
936
937    def status_entries(self, run_error_msg=None):
938        """Returns expected base_job.status_log_entry objects for this test.
939
940        @param run_error_msg: String containing run error message, or None if no
941            run error was encountered.
942        @return: List of Autotest base_job.status_log_entry objects.
943        """
944        # Deliberately-skipped tests shouldn't have status entries unless errors
945        # were also reported.
946        if not self._report_skipped and self._skip_reason and not self._errors:
947            return []
948
949        def make(status_code, dt, msg=''):
950            """Makes a base_job.status_log_entry.
951
952            @param status_code: String status code.
953            @param dt: datetime.datetime object containing entry time, and its
954                value should be None if the test is not supposed to be started
955            @param msg: String message (typically only set for errors).
956            @return: base_job.status_log_entry object.
957            """
958            timestamp = int(
959                    (dt - tast._UNIX_EPOCH).total_seconds()) if dt else None
960            return base_job.status_log_entry(
961                    status_code, None,
962                    tast.tast._TEST_NAME_PREFIX + self._name, msg, None,
963                    timestamp=timestamp)
964
965        entries = [make(tast.tast._JOB_STATUS_START, self._start_time)]
966
967        if not self._start_time:
968            if run_error_msg:
969                reason = '%s due to global error: %s' % (
970                        tast.tast._TEST_DID_NOT_RUN_MSG, run_error_msg)
971            else:
972                reason = tast.tast._TEST_DID_NOT_RUN_MSG
973
974            entries.append(make(tast.tast._JOB_STATUS_NOSTATUS, None, reason))
975            entries.append(make(tast.tast._JOB_STATUS_END_NOSTATUS, None))
976        elif self._end_time and self._skip_reason and not self._errors:
977            entries.append(
978                    make(tast.tast._JOB_STATUS_SKIP, self._end_time,
979                         self._skip_reason))
980            entries.append(make(tast.tast._JOB_STATUS_END_SKIP,
981                                self._end_time))
982        elif self._end_time and not self._errors:
983            entries.append(make(tast.tast._JOB_STATUS_GOOD, self._end_time))
984            entries.append(make(tast.tast._JOB_STATUS_END_GOOD, self._end_time))
985        else:
986            for e in self._errors:
987                entries.append(make(tast.tast._JOB_STATUS_FAIL, e[1], e[0]))
988            if not self._end_time:
989                # If the test didn't finish, the run error (if any) should be
990                # included.
991                if run_error_msg:
992                    entries.append(make(tast.tast._JOB_STATUS_FAIL,
993                                        self._start_time, run_error_msg))
994                entries.append(make(tast.tast._JOB_STATUS_FAIL,
995                                    self._start_time,
996                                    tast.tast._TEST_DID_NOT_FINISH_MSG))
997            entries.append(make(tast.tast._JOB_STATUS_END_FAIL,
998                                self._end_time or self._start_time or NOW))
999
1000        return entries
1001
1002
1003class FakeServerJob:
1004    """Fake implementation of server_job from server/server_job.py."""
1005    def __init__(self, result_dir, tmp_dir):
1006        self.pkgmgr = None
1007        self.autodir = None
1008        self.resultdir = result_dir
1009        self.tmpdir = tmp_dir
1010        self.post_run_hook = None
1011        self.status_entries = []
1012
1013    def add_post_run_hook(self, hook):
1014        """Stub implementation of server_job.add_post_run_hook."""
1015        self.post_run_hook = hook
1016
1017    def record_entry(self, entry, log_in_subdir=True):
1018        """Stub implementation of server_job.record_entry."""
1019        assert(not log_in_subdir)
1020        self.status_entries.append(entry)
1021
1022
1023class FakeHost:
1024    """Fake implementation of AbstractSSHHost from server/hosts/abstract_ssh.py.
1025    """
1026    def __init__(self, hostname, port):
1027        self.hostname = hostname
1028        self.port = port
1029        self.host_info_store = host_info.InMemoryHostInfoStore(None)
1030
1031
1032class TastCommand(object):
1033    """Args and behavior for fake_tast.py for a given command, e.g. "list"."""
1034
1035    def __init__(self, required_args, status=0, stdout=None, stderr=None,
1036                 files_to_write=None):
1037        """
1038        @param required_args: List of required args, each specified as
1039            'name=value'. Names correspond to argparse-provided names in
1040            fake_tast.py (typically just the flag name, e.g. 'build' or
1041            'resultsdir'). Values correspond to str() representations of the
1042            argparse-provided values.
1043        @param status: Status code for fake_tast.py to return.
1044        @param stdout: Data to write to stdout.
1045        @param stderr: Data to write to stderr.
1046        @param files_to_write: Dict mapping from paths of files to write to
1047            their contents, or None to not write any files.
1048        """
1049        self.required_args = required_args
1050        self.status = status
1051        self.stdout = stdout
1052        self.stderr = stderr
1053        self.files_to_write = files_to_write if files_to_write else {}
1054
1055
1056def to_rfc3339(t):
1057    """Returns an RFC3339 timestamp.
1058
1059    @param t: UTC datetime.datetime object or None for the zero time.
1060    @return: String RFC3339 time, e.g. '2018-01-02T02:34:28Z'.
1061    """
1062    if t is None:
1063        return '0001-01-01T00:00:00Z'
1064    assert(not t.utcoffset())
1065    return t.strftime('%Y-%m-%dT%H:%M:%SZ')
1066
1067
1068def get_status_entries_from_tests(tests, run_error_msg=None):
1069    """Returns a flattened list of status entries from TestInfo objects.
1070
1071    @param tests: List of TestInfo objects.
1072    @param run_error_msg: String containing run error message, or None if no
1073        run error was encountered.
1074    @return: Flattened list of base_job.status_log_entry objects produced by
1075        calling status_entries() on each TestInfo object.
1076    """
1077    return sum([t.status_entries(run_error_msg) for t in tests], [])
1078
1079
1080def status_string(entries):
1081    """Returns a string describing a list of base_job.status_log_entry objects.
1082
1083    @param entries: List of base_job.status_log_entry objects.
1084    @return: String containing space-separated representations of entries.
1085    """
1086    found_test_strings = []
1087    missing_test_strings = []
1088
1089    # For each missing test, there are three corresponding entries that we want
1090    # to put in missing_test_strings: "START", "NOSTATUS" and "END NOSTATUS".
1091    # We cannot tell if the test is missing in the "START" entry. Therefore,
1092    # we use missing_tests to keep track of all the missing tests.
1093    missing_tests = set(entry.operation for entry in entries
1094                        if entry.status_code == tast.tast._JOB_STATUS_NOSTATUS)
1095
1096    for entry in entries:
1097        message = entry.message
1098        if isinstance(message, six.binary_type):
1099            message = message.decode('utf-8')
1100        # Ignore timestamp for missing entry
1101        timestamp = entry.fields[base_job.status_log_entry.TIMESTAMP_FIELD]
1102        if entry.operation not in missing_tests:
1103            s = '[%s %s %s %s]' % (timestamp, entry.operation,
1104                                   entry.status_code, repr(message))
1105            found_test_strings.append(s)
1106        else:
1107            s = '[%s %s %s]' % (entry.operation, entry.status_code,
1108                                repr(message))
1109            missing_test_strings.append(s)
1110    return ' '.join(found_test_strings + missing_test_strings)
1111
1112
1113if __name__ == '__main__':
1114    unittest.main()
1115