1#!/usr/bin/python3 2# -*- coding: utf-8 -*- 3# Copyright 2018 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7from __future__ import absolute_import 8from __future__ import division 9from __future__ import print_function 10 11import datetime 12import json 13import os 14import shutil 15import tempfile 16import unittest 17import six 18import yaml 19 20import dateutil.parser 21 22import common 23from autotest_lib.server.site_tests.tast import tast 24from autotest_lib.client.common_lib import base_job 25from autotest_lib.client.common_lib import error 26from autotest_lib.client.common_lib import utils 27from autotest_lib.server.cros.network import wifi_test_context_manager 28from autotest_lib.server.hosts import host_info 29from autotest_lib.server.hosts import servo_constants 30 31 32# Arbitrary base time to use in tests. 33BASE_TIME = dateutil.parser.parse('2018-01-01T00:00:00Z') 34 35# Arbitrary fixed time to use in place of time.time() when running tests. 36NOW = BASE_TIME + datetime.timedelta(0, 60) 37 38 39class TastTest(unittest.TestCase): 40 """Tests the tast.tast Autotest server test. 41 42 This unit test verifies interactions between the tast.py Autotest server 43 test and the 'tast' executable that's actually responsible for running 44 individual Tast tests and reporting their results. To do that, it sets up a 45 fake environment in which it can run the Autotest test, including a fake 46 implementation of the 'tast' executable provided by testdata/fake_tast.py. 47 """ 48 49 # Arbitrary data to pass to the tast command. 50 HOST = 'dut.example.net' 51 PORT = 22 52 TEST_PATTERNS = ['(bvt)'] 53 MAX_RUN_SEC = 300 54 55 # Default paths where Tast files are installed by Portage packages. 56 _PORTAGE_TAST_PATH = tast.tast._PORTAGE_TAST_PATH 57 _PORTAGE_REMOTE_BUNDLE_DIR = '/usr/libexec/tast/bundles/remote' 58 _PORTAGE_REMOTE_DATA_DIR = '/usr/share/tast/data' 59 _PORTAGE_REMOTE_TEST_RUNNER_PATH = '/usr/bin/remote_test_runner' 60 61 def setUp(self): 62 self._temp_dir = tempfile.mkdtemp('.tast_unittest') 63 64 def make_subdir(subdir): 65 # pylint: disable=missing-docstring 66 path = os.path.join(self._temp_dir, subdir) 67 os.mkdir(path) 68 return path 69 70 self._job = FakeServerJob(make_subdir('job'), make_subdir('tmp')) 71 self._bin_dir = make_subdir('bin') 72 self._out_dir = make_subdir('out') 73 self._root_dir = make_subdir('root') 74 self._set_up_root() 75 76 self._test = tast.tast(self._job, self._bin_dir, self._out_dir) 77 self._host = FakeHost(self.HOST, self.PORT) 78 79 self._test_patterns = [] 80 self._tast_commands = {} 81 82 def tearDown(self): 83 shutil.rmtree(self._temp_dir) 84 85 def _get_path_in_root(self, orig_path): 86 """Appends a path to self._root_dir (which stores Tast-related files). 87 88 @param orig_path: Path to append, e.g. '/usr/bin/tast'. 89 @return: Path within the root dir, e.g. '/path/to/root/usr/bin/tast'. 90 """ 91 return os.path.join(self._root_dir, os.path.relpath(orig_path, '/')) 92 93 def _set_up_root(self, ssp=False): 94 """Creates Tast-related files and dirs within self._root_dir. 95 96 @param ssp: If True, install files to locations used with Server-Side 97 Packaging. Otherwise, install to locations used by Portage packages. 98 """ 99 def create_file(orig_dest, src=None): 100 """Creates a file under self._root_dir. 101 102 @param orig_dest: Original absolute path, e.g. "/usr/bin/tast". 103 @param src: Absolute path to file to copy, or none to create empty. 104 @return: Absolute path to created file. 105 """ 106 dest = self._get_path_in_root(orig_dest) 107 if not os.path.exists(os.path.dirname(dest)): 108 os.makedirs(os.path.dirname(dest)) 109 if src: 110 shutil.copyfile(src, dest) 111 shutil.copymode(src, dest) 112 else: 113 open(dest, 'a').close() 114 return dest 115 116 # Copy fake_tast.py to the usual location for the 'tast' executable. 117 # The remote bundle dir and remote_test_runner just need to exist so 118 # tast.py can find them; their contents don't matter since fake_tast.py 119 # won't actually use them. 120 self._tast_path = create_file( 121 tast.tast._SSP_TAST_PATH if ssp else self._PORTAGE_TAST_PATH, 122 os.path.join(os.path.dirname(os.path.realpath(__file__)), 123 'testdata', 'fake_tast.py')) 124 self._remote_bundle_dir = os.path.dirname( 125 create_file(os.path.join(tast.tast._SSP_REMOTE_BUNDLE_DIR if ssp 126 else self._PORTAGE_REMOTE_BUNDLE_DIR, 127 'fake'))) 128 self._remote_data_dir = os.path.dirname( 129 create_file(os.path.join(tast.tast._SSP_REMOTE_DATA_DIR if ssp 130 else self._PORTAGE_REMOTE_DATA_DIR, 131 'fake'))) 132 self._remote_test_runner_path = create_file( 133 tast.tast._SSP_REMOTE_TEST_RUNNER_PATH if ssp 134 else self._PORTAGE_REMOTE_TEST_RUNNER_PATH) 135 136 def _init_tast_commands(self, 137 tests, 138 ssp=False, 139 build=False, 140 build_bundle='fakebundle', 141 run_private_tests=False, 142 run_vars=[], 143 run_varsfiles=[], 144 download_data_lazily=False, 145 totalshards=1, 146 shardindex=0, 147 companion_duts={}, 148 maybemissingvars='', 149 port=True, 150 test_filter_files=[]): 151 """Sets fake_tast.py's behavior for 'list' and 'run' commands. 152 153 @param tests: List of TestInfo objects. 154 @param run_private_tests: Whether to run private tests. 155 @param run_vars: List of string values that should be passed to 'run' 156 via -var. 157 @param run_varsfiles: filenames should be passed to 'run' via -varsfile. 158 @param download_data_lazily: Whether to download external data files 159 lazily. 160 @param totalshards: total number of shards. 161 @param shardindex: shard index to be run. 162 @param companion_duts: mapping between roles and DUTs. 163 @param test_filter_files: a list of files specify which tests to disable. 164 """ 165 list_args = [ 166 'build=%s' % build, 167 'patterns=%s' % self.TEST_PATTERNS, 168 'sshretries=%d' % tast.tast._SSH_CONNECT_RETRIES, 169 'downloaddata=%s' % 170 ('lazy' if download_data_lazily else 'batch'), 171 'totalshards=%d' % totalshards, 172 'shardindex=%d' % shardindex, 173 'target=%s%s' % (self.HOST, ':%d' % self.PORT if port else ''), 174 'verbose=True', 175 ] 176 if build: 177 list_args.extend([ 178 'buildbundle=%s' % build_bundle, 179 'checkbuilddeps=False', 180 ]) 181 else: 182 if ssp: 183 list_args.extend([ 184 'remotebundledir=%s' % self._remote_bundle_dir, 185 'remotedatadir=%s' % self._remote_data_dir, 186 'remoterunner=%s' % self._remote_test_runner_path, 187 ]) 188 else: 189 list_args.extend([ 190 'remotebundledir=None', 191 'remotedatadir=None', 192 'remoterunner=None', 193 ]) 194 list_args.append('downloadprivatebundles=%s' % run_private_tests) 195 run_args = list_args + [ 196 'resultsdir=%s' % self._test.resultsdir, 197 'continueafterfailure=True', 198 'var=%s' % run_vars, 199 ] 200 if run_varsfiles: 201 run_args.append('varsfile=%s' % run_varsfiles) 202 if companion_duts: 203 role_dut_pairs = [] 204 for role, dut in sorted(companion_duts.items()): 205 role_dut_pairs.append('%s:%s%s' % 206 (role, dut.hostname, 207 ':%d' % dut.port if dut.port else '')) 208 run_args.append('companiondut=%s' % role_dut_pairs) 209 if test_filter_files: 210 run_args.append('testfilterfile=%s' % test_filter_files) 211 test_list = json.dumps([t.test() for t in tests]) 212 run_files = { 213 self._results_path(): ''.join( 214 [json.dumps(t.test_result()) + '\n' 215 for t in tests if t.start_time()]), 216 } 217 self._tast_commands = { 218 'list': TastCommand(list_args, stdout=test_list), 219 'run': TastCommand(run_args, files_to_write=run_files), 220 } 221 222 def _results_path(self): 223 """Returns the path where "tast run" writes streamed results. 224 225 @return Path to streamed results file. 226 """ 227 return os.path.join(self._test.resultsdir, 228 tast.tast._STREAMED_RESULTS_FILENAME) 229 230 def _run_test(self, 231 ignore_test_failures=False, 232 command_args=[], 233 ssp=False, 234 build=False, 235 build_bundle='fakebundle', 236 run_private_tests=False, 237 varsfiles=[], 238 download_data_lazily=False, 239 clear_tpm=False, 240 totalshards=1, 241 shardindex=0, 242 companion_duts={}, 243 varslist=[], 244 maybemissingvars='', 245 use_camera_box=False, 246 vars_gs_path='', 247 test_filter_files=[], 248 report_skipped=False): 249 """Writes fake_tast.py's configuration and runs the test. 250 251 @param ignore_test_failures: Passed as the identically-named arg to 252 Tast.initialize(). 253 @param command_args: Passed as the identically-named arg to 254 Tast.initialize(). 255 @param ssp: Passed as the identically-named arg to Tast.initialize(). 256 @param build: Passed as the identically-named arg to Tast.initialize(). 257 @param build_bundle: Passed as the identically-named arg to 258 Tast.initialize(). 259 @param run_private_tests: Passed as the identically-named arg to 260 Tast.initialize(). 261 @param varsfiles: list of names of yaml files containing variables set 262 in |-varsfile| arguments. 263 @param download_data_lazily: Whether to download external data files 264 lazily. 265 @param clear_tpm: clear the TPM first before running the tast tests. 266 @param varslist: list of strings to pass to tast run command as |-vars| 267 arguments. Each string should be formatted as "name=value". 268 @param maybemissingvars: a regex to pass to tast run command as 269 |-maybemissingvars| arguments. 270 @param use_camera_box: Whether the test run in CameraBox. 271 @param report_skipped: Whether or not skipped tests should be reported. 272 """ 273 self._test.initialize(self._host, 274 self.TEST_PATTERNS, 275 ignore_test_failures=ignore_test_failures, 276 max_run_sec=self.MAX_RUN_SEC, 277 command_args=command_args, 278 install_root=self._root_dir, 279 ssp=ssp, 280 build=build, 281 build_bundle=build_bundle, 282 run_private_tests=run_private_tests, 283 varsfiles=varsfiles, 284 download_data_lazily=download_data_lazily, 285 clear_tpm=clear_tpm, 286 totalshards=totalshards, 287 shardindex=shardindex, 288 companion_duts=companion_duts, 289 varslist=varslist, 290 maybemissingvars=maybemissingvars, 291 use_camera_box=use_camera_box, 292 vars_gs_path=vars_gs_path, 293 test_filter_files=test_filter_files, 294 report_skipped=report_skipped) 295 self._test.set_fake_now_for_testing( 296 (NOW - tast._UNIX_EPOCH).total_seconds()) 297 298 cfg = {} 299 for name, cmd in six.iteritems(self._tast_commands): 300 cfg[name] = vars(cmd) 301 path = os.path.join(os.path.dirname(self._tast_path), 'config.json') 302 with open(path, 'a') as f: 303 json.dump(cfg, f) 304 305 try: 306 self._test.run_once() 307 finally: 308 if self._job.post_run_hook: 309 self._job.post_run_hook() 310 311 def _run_test_for_failure(self, failed, missing): 312 """Calls _run_test and checks the resulting failure message. 313 314 @param failed: List of TestInfo objects for expected-to-fail tests. 315 @param missing: List of TestInfo objects for expected-missing tests. 316 """ 317 with self.assertRaises(error.TestFail) as cm: 318 self._run_test() 319 320 msg = self._test._get_failure_message([t.name() for t in failed], 321 [t.name() for t in missing]) 322 self.assertEqual(msg, str(cm.exception)) 323 324 def _load_job_keyvals(self): 325 """Loads job keyvals. 326 327 @return Keyvals as a str-to-str dict, or None if keyval file is missing. 328 """ 329 if not os.path.exists(os.path.join(self._job.resultdir, 330 'keyval')): 331 return None 332 return utils.read_keyval(self._job.resultdir) 333 334 def testPassingTests(self): 335 """Tests that passing tests are reported correctly.""" 336 tests = [TestInfo('pkg.Test1', 0, 2), 337 TestInfo('pkg.Test2', 3, 5), 338 TestInfo('pkg.Test3', 6, 8)] 339 self._init_tast_commands(tests) 340 self._run_test() 341 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 342 status_string(self._job.status_entries)) 343 self.assertIs(self._load_job_keyvals(), None) 344 345 def testPassingTestsNoPort(self): 346 """Tests that passing tests are reported correctly.""" 347 self._host = FakeHost(self.HOST, None) 348 tests = [ 349 TestInfo('pkg.Test1', 0, 2), 350 TestInfo('pkg.Test2', 3, 5), 351 TestInfo('pkg.Test3', 6, 8) 352 ] 353 self._init_tast_commands(tests, port=None) 354 self._run_test() 355 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 356 status_string(self._job.status_entries)) 357 self.assertIs(self._load_job_keyvals(), None) 358 359 def testFailingTests(self): 360 """Tests that failing tests are reported correctly.""" 361 tests = [TestInfo('pkg.Test1', 0, 2, errors=[('failed', 1)]), 362 TestInfo('pkg.Test2', 3, 6), 363 TestInfo('pkg.Test3', 7, 8, errors=[('another', 7)])] 364 self._init_tast_commands(tests) 365 self._run_test_for_failure([tests[0], tests[2]], []) 366 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 367 status_string(self._job.status_entries)) 368 self.assertIs(self._load_job_keyvals(), None) 369 370 def testIgnoreTestFailures(self): 371 """Tests that tast.tast can still pass with Tast test failures.""" 372 tests = [TestInfo('pkg.Test', 0, 2, errors=[('failed', 1)])] 373 self._init_tast_commands(tests) 374 self._run_test(ignore_test_failures=True) 375 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 376 status_string(self._job.status_entries)) 377 378 def testSkippedTest(self): 379 """Tests that skipped tests aren't reported.""" 380 tests = [ 381 TestInfo('pkg.Normal', 0, 1), 382 TestInfo('pkg.Skipped', 2, 2, skip_reason='missing deps') 383 ] 384 self._init_tast_commands(tests) 385 self._run_test() 386 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 387 status_string(self._job.status_entries)) 388 self.assertIs(self._load_job_keyvals(), None) 389 390 def testSkippedTestWithReportSkipped(self): 391 """Tests that skipped tests are reported correctly when report_skipped=True.""" 392 tests = [ 393 TestInfo('pkg.Normal', 0, 1), 394 TestInfo('pkg.Skipped', 395 2, 396 3, 397 skip_reason='missing deps', 398 report_skipped=True) 399 ] 400 self._init_tast_commands(tests) 401 self._run_test(report_skipped=True) 402 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 403 status_string(self._job.status_entries)) 404 self.assertIs(self._load_job_keyvals(), None) 405 406 def testSkippedTestWithErrors(self): 407 """Tests that skipped tests are reported if they also report errors.""" 408 tests = [TestInfo('pkg.Normal', 0, 1), 409 TestInfo('pkg.SkippedWithErrors', 2, 2, skip_reason='bad deps', 410 errors=[('bad deps', 2)])] 411 self._init_tast_commands(tests) 412 self._run_test_for_failure([tests[1]], []) 413 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 414 status_string(self._job.status_entries)) 415 self.assertIs(self._load_job_keyvals(), None) 416 417 def testMissingTests(self): 418 """Tests that missing tests are reported when there's another test.""" 419 tests = [TestInfo('pkg.Test1', None, None), 420 TestInfo('pkg.Test2', 0, 2), 421 TestInfo('pkg.Test3', None, None)] 422 self._init_tast_commands(tests) 423 self._run_test_for_failure([], [tests[0], tests[2]]) 424 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 425 status_string(self._job.status_entries)) 426 self.assertEqual(self._load_job_keyvals(), 427 {'tast_missing_test.0': 'pkg.Test1', 428 'tast_missing_test.1': 'pkg.Test3'}) 429 430 def testNoTestsRun(self): 431 """Tests that a missing test is reported when it's the only test.""" 432 tests = [TestInfo('pkg.Test', None, None)] 433 self._init_tast_commands(tests) 434 self._run_test_for_failure([], [tests[0]]) 435 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 436 status_string(self._job.status_entries)) 437 self.assertEqual(self._load_job_keyvals(), 438 {'tast_missing_test.0': 'pkg.Test'}) 439 440 def testHangingTest(self): 441 """Tests that a not-finished test is reported.""" 442 tests = [TestInfo('pkg.Test1', 0, 2), 443 TestInfo('pkg.Test2', 3, None), 444 TestInfo('pkg.Test3', None, None)] 445 self._init_tast_commands(tests) 446 self._run_test_for_failure([tests[1]], [tests[2]]) 447 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 448 status_string(self._job.status_entries)) 449 self.assertEqual(self._load_job_keyvals(), 450 {'tast_missing_test.0': 'pkg.Test3'}) 451 452 def testRunError(self): 453 """Tests that a run error is reported for a non-finished test.""" 454 tests = [TestInfo('pkg.Test1', 0, 2), 455 TestInfo('pkg.Test2', 3, None), 456 TestInfo('pkg.Test3', None, None)] 457 self._init_tast_commands(tests) 458 459 # Simulate the run being aborted due to a lost SSH connection. 460 path = os.path.join(self._test.resultsdir, 461 tast.tast._RUN_ERROR_FILENAME) 462 msg = 'Lost SSH connection to DUT' 463 self._tast_commands['run'].files_to_write[path] = msg 464 self._tast_commands['run'].status = 1 465 466 self._run_test_for_failure([tests[1]], [tests[2]]) 467 self.assertEqual( 468 status_string(get_status_entries_from_tests(tests, msg)), 469 status_string(self._job.status_entries)) 470 self.assertEqual(self._load_job_keyvals(), 471 {'tast_missing_test.0': 'pkg.Test3'}) 472 473 def testNoTestsMatched(self): 474 """Tests that no error is raised if no tests are matched.""" 475 self._init_tast_commands([]) 476 self._run_test() 477 478 def testListCommandFails(self): 479 """Tests that an error is raised if the list command fails.""" 480 self._init_tast_commands([]) 481 482 # The list subcommand writes log messages to stderr on failure. 483 FAILURE_MSG = "failed to connect" 484 self._tast_commands['list'].status = 1 485 self._tast_commands['list'].stdout = None 486 self._tast_commands['list'].stderr = 'blah blah\n%s\n' % FAILURE_MSG 487 488 # The first line of the exception should include the last line of output 489 # from tast. 490 with self.assertRaises(error.TestFail) as cm: 491 self._run_test() 492 first_line = str(cm.exception).split('\n')[0] 493 self.assertTrue(FAILURE_MSG in first_line, 494 '"%s" not in "%s"' % (FAILURE_MSG, first_line)) 495 496 def testListCommandPrintsGarbage(self): 497 """Tests that an error is raised if the list command prints bad data.""" 498 self._init_tast_commands([]) 499 self._tast_commands['list'].stdout = 'not valid JSON data' 500 with self.assertRaises(error.TestFail) as _: 501 self._run_test() 502 503 def testRunCommandFails(self): 504 """Tests that an error is raised if the run command fails.""" 505 tests = [TestInfo('pkg.Test1', 0, 1), TestInfo('pkg.Test2', 2, 3)] 506 self._init_tast_commands(tests) 507 FAILURE_MSG = "this is the failure" 508 self._tast_commands['run'].status = 1 509 self._tast_commands['run'].stdout = 'blah blah\n%s\n' % FAILURE_MSG 510 511 with self.assertRaises(error.TestFail) as cm: 512 self._run_test() 513 self.assertTrue(FAILURE_MSG in str(cm.exception), 514 '"%s" not in "%s"' % (FAILURE_MSG, str(cm.exception))) 515 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 516 status_string(self._job.status_entries)) 517 518 def testRunCommandWritesTrailingGarbage(self): 519 """Tests that an error is raised if the run command prints bad data.""" 520 tests = [TestInfo('pkg.Test1', 0, 1), TestInfo('pkg.Test2', None, None)] 521 self._init_tast_commands(tests) 522 self._tast_commands['run'].files_to_write[self._results_path()] += \ 523 'not valid JSON data' 524 with self.assertRaises(error.TestFail) as _: 525 self._run_test() 526 # Missing tests are reported in the _parse_results which is called after 527 # _run_tests, so missing tests is not available and the status_entries 528 # should include only the first test case. 529 self.assertEqual( 530 status_string(get_status_entries_from_tests(tests[:1])), 531 status_string(self._job.status_entries)) 532 533 def testRunCommandWithSharding(self): 534 """Tests that sharding parameter is passing thru without issues.""" 535 tests = [TestInfo('pkg.Test1', 0, 2), TestInfo('pkg.Test2', 3, 5)] 536 self._init_tast_commands(tests=tests, totalshards=2, shardindex=1) 537 self._run_test(totalshards=2, shardindex=1) 538 539 def testRunCommandWithCompanionDUTs(self): 540 """Tests that companion dut parameter is passing thru without issues.""" 541 tests = [TestInfo('pkg.Test1', 0, 2), TestInfo('pkg.Test2', 3, 5)] 542 companion_duts = {'role1': FakeHost('dut1', 22), 'role2':FakeHost('dut2', 22)} 543 self._init_tast_commands(tests=tests, companion_duts=companion_duts) 544 self._run_test(companion_duts=companion_duts) 545 546 def testRunCommandWithCompanionDUTsNoPort(self): 547 """Tests that companion dut parameter is passing thru without issues.""" 548 tests = [TestInfo('pkg.Test1', 0, 2), TestInfo('pkg.Test2', 3, 5)] 549 companion_duts = { 550 'role1': FakeHost('dut1', 22), 551 'role2': FakeHost('dut2', None) 552 } 553 self._init_tast_commands(tests=tests, companion_duts=companion_duts) 554 self._run_test(companion_duts=companion_duts) 555 556 def testRunCommandWithTestFilterFiles(self): 557 """Tests that companion dut parameter is passing thru without issues.""" 558 tests = [TestInfo('pkg.Test1', 0, 2), TestInfo('pkg.Test2', 3, 5)] 559 test_filter_files = ['filter_1.txt', 'filter_2.txt'] 560 self._init_tast_commands(tests=tests, 561 test_filter_files=test_filter_files) 562 self._run_test(test_filter_files=test_filter_files) 563 564 def testNoResultsFile(self): 565 """Tests that an error is raised if no results file is written.""" 566 tests = [TestInfo('pkg.Test1', None, None)] 567 self._init_tast_commands(tests) 568 self._tast_commands['run'].files_to_write = {} 569 with self.assertRaises(error.TestFail) as _: 570 self._run_test() 571 # Missing tests are reported in the _parse_results which is called after 572 # _run_tests, so missing tests is not available and the status_entries 573 # should be empty. 574 self.assertEqual(status_string(get_status_entries_from_tests([])), 575 status_string(self._job.status_entries)) 576 577 def testNoResultsFileAfterRunCommandFails(self): 578 """Tests that stdout is included in error after missing results.""" 579 tests = [TestInfo('pkg.Test1', None, None)] 580 self._init_tast_commands(tests) 581 FAILURE_MSG = "this is the failure" 582 self._tast_commands['run'].status = 1 583 self._tast_commands['run'].files_to_write = {} 584 self._tast_commands['run'].stdout = 'blah blah\n%s\n' % FAILURE_MSG 585 586 # The first line of the exception should include the last line of output 587 # from tast rather than a message about the missing results file. 588 with self.assertRaises(error.TestFail) as cm: 589 self._run_test() 590 first_line = str(cm.exception).split('\n')[0] 591 self.assertTrue(FAILURE_MSG in first_line, 592 '"%s" not in "%s"' % (FAILURE_MSG, first_line)) 593 # Missing tests are reported in the _parse_results which is called after 594 # _run_tests, so missing tests is not available and the status_entries 595 # should be empty. 596 self.assertEqual(status_string(get_status_entries_from_tests([])), 597 status_string(self._job.status_entries)) 598 599 def testMissingTastExecutable(self): 600 """Tests that an error is raised if the tast command isn't found.""" 601 os.remove(self._get_path_in_root(self._PORTAGE_TAST_PATH)) 602 with self.assertRaises(error.TestFail) as _: 603 self._run_test() 604 605 def testMissingRemoteTestRunner(self): 606 """Tests that an error is raised if remote_test_runner isn't found.""" 607 os.remove(self._get_path_in_root(self._PORTAGE_REMOTE_TEST_RUNNER_PATH)) 608 with self.assertRaises(error.TestFail) as _: 609 self._run_test() 610 611 def testMissingRemoteBundleDir(self): 612 """Tests that an error is raised if remote bundles aren't found.""" 613 shutil.rmtree(self._get_path_in_root(self._PORTAGE_REMOTE_BUNDLE_DIR)) 614 with self.assertRaises(error.TestFail) as _: 615 self._run_test() 616 617 def testSspPaths(self): 618 """Tests that files can be located at their alternate SSP locations.""" 619 for p in os.listdir(self._root_dir): 620 shutil.rmtree(os.path.join(self._root_dir, p)) 621 self._set_up_root(ssp=True) 622 623 tests = [TestInfo('pkg.Test', 0, 1)] 624 self._init_tast_commands(tests, ssp=True) 625 self._run_test(ssp=True) 626 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 627 status_string(self._job.status_entries)) 628 629 def testBuild(self): 630 """Tests that Tast tests can be built.""" 631 tests = [TestInfo('pkg.Test', 0, 1)] 632 self._init_tast_commands(tests, build=True) 633 self._run_test(build=True) 634 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 635 status_string(self._job.status_entries)) 636 637 def testFailureMessage(self): 638 """Tests that appropriate failure messages are generated.""" 639 # Just do this to initialize the self._test. 640 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)]) 641 self._run_test() 642 643 msg = lambda f, m: self._test._get_failure_message(f, m) 644 self.assertEqual('', msg([], [])) 645 self.assertEqual('1 failed: t1', msg(['t1'], [])) 646 self.assertEqual('2 failed: t1 t2', msg(['t1', 't2'], [])) 647 self.assertEqual('1 missing: t1', msg([], ['t1'])) 648 self.assertEqual('1 failed: t1; 1 missing: t2', msg(['t1'], ['t2'])) 649 650 def testFailureMessageIgnoreTestFailures(self): 651 """Tests that test failures are ignored in messages when requested.""" 652 # Just do this to initialize the self._test. 653 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)]) 654 self._run_test(ignore_test_failures=True) 655 656 msg = lambda f, m: self._test._get_failure_message(f, m) 657 self.assertEqual('', msg([], [])) 658 self.assertEqual('', msg(['t1'], [])) 659 self.assertEqual('1 missing: t1', msg([], ['t1'])) 660 self.assertEqual('1 missing: t2', msg(['t1'], ['t2'])) 661 662 def testNonAsciiFailureMessage(self): 663 """Tests that non-ascii failure message should be handled correctly""" 664 tests = [TestInfo('pkg.Test', 0, 2, errors=[('失敗', 1)])] 665 self._init_tast_commands(tests) 666 self._run_test(ignore_test_failures=True) 667 self.assertEqual(status_string(get_status_entries_from_tests(tests)), 668 status_string(self._job.status_entries)) 669 670 def testRunPrivateTests(self): 671 """Tests running private tests.""" 672 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 673 run_private_tests=True) 674 self._run_test(ignore_test_failures=True, run_private_tests=True) 675 676 def testDownloadDataLazily(self): 677 """Tests downloading external data files lazily.""" 678 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 679 download_data_lazily=True) 680 self._run_test(ignore_test_failures=True, download_data_lazily=True) 681 682 def testServoFromCommandArgs(self): 683 """Tests passing servo info via command-line arg.""" 684 SERVO_HOST = 'chromeos6-row2-rack21-labstation1.cros' 685 SERVO_PORT = '9995' 686 687 servo_var = 'servo=%s:%s' % (SERVO_HOST, SERVO_PORT) 688 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 689 run_vars=[servo_var]) 690 691 # Simulate servo info being passed on the command line via --args. 692 args = [ 693 '%s=%s' % (servo_constants.SERVO_HOST_ATTR, SERVO_HOST), 694 '%s=%s' % (servo_constants.SERVO_PORT_ATTR, SERVO_PORT), 695 ] 696 self._run_test(command_args=args) 697 698 def testServoFromHostInfoStore(self): 699 """Tests getting servo info from the host.""" 700 SERVO_HOST = 'chromeos6-row2-rack21-labstation1.cros' 701 SERVO_PORT = '9995' 702 703 servo_var = 'servo=%s:%s' % (SERVO_HOST, SERVO_PORT) 704 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 705 run_vars=[servo_var]) 706 707 # Simulate the host's servo info being stored in the Autotest DB. 708 attr = { 709 servo_constants.SERVO_HOST_ATTR: SERVO_HOST, 710 servo_constants.SERVO_PORT_ATTR: SERVO_PORT, 711 } 712 self._host.host_info_store.commit(host_info.HostInfo(attributes=attr)) 713 self._run_test() 714 715 def testWificellArgs(self): 716 """Tests passing Wificell specific args into Tast runner.""" 717 ROUTER_IP = '192.168.1.2:1234' 718 PCAP_IP = '192.168.1.3:2345' 719 wificell_var = [ 720 'router=%s' % ROUTER_IP, 721 'pcap=%s' % PCAP_IP, 722 'routers=%s,%s' % (ROUTER_IP, PCAP_IP), 723 ] 724 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 725 run_vars=wificell_var) 726 727 WiFiManager = wifi_test_context_manager.WiFiTestContextManager 728 arg_list = [ 729 (WiFiManager.CMDLINE_ROUTER_ADDR, ROUTER_IP), 730 (WiFiManager.CMDLINE_PCAP_ADDR, PCAP_IP), 731 ] 732 args = [("%s=%s" % x) for x in arg_list] 733 self._run_test(command_args=args) 734 735 def testFirmwareArgs(self): 736 """Tests passing firmware specific args into Tast runner.""" 737 vars = ['firmware.no_ec_sync=true'] 738 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], run_vars=vars) 739 740 args = ['no_ec_sync=true'] 741 self._run_test(command_args=args) 742 743 def testCameraboxArgs(self): 744 """Tests passing camerabox specific args into Tast runner.""" 745 # Now it won't specify any chart IP address if it does not find a valid 746 # one. 747 vars = [] 748 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], run_vars=vars) 749 self._run_test(use_camera_box=True) 750 751 def testVarsfileOption(self): 752 with tempfile.NamedTemporaryFile( 753 suffix='.yaml', dir=self._temp_dir) as temp_file: 754 yaml.dump({ 755 "var1": "val1", 756 "var2": "val2" 757 }, 758 stream=temp_file, 759 encoding='utf-8') 760 varsfiles = [temp_file.name] 761 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 762 run_varsfiles=varsfiles) 763 self._run_test(varsfiles=varsfiles) 764 765 def testVarslistOption(self): 766 varslist = ["var1=val1", "var2=val2"] 767 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 768 run_vars=varslist) 769 self._run_test(varslist=varslist) 770 771 def testMaybeMissingVarsOption(self): 772 arg = '.*\.Test' 773 self._init_tast_commands([TestInfo('pkg.Test', 0, 0)], 774 maybemissingvars=arg) 775 self._run_test(maybemissingvars=arg) 776 777 def testFillExtvars(self): 778 with tempfile.NamedTemporaryFile(suffix='.yaml', 779 dir=self._temp_dir) as temp_file: 780 yaml.dump({ 781 'var1': 'val1', 782 'var2': 'val2' 783 }, 784 stream=temp_file, 785 encoding='utf-8') 786 787 host = FakeHost(self.HOST, self.PORT) 788 host.host_info_store = host_info.InMemoryHostInfoStore( 789 host_info.HostInfo(labels=[ 790 'plus', 'board:octopus', 'fleex', 'os:cros' 791 ])) 792 self._test.initialize( 793 host=host, 794 test_exprs=self.TEST_PATTERNS, 795 varsfiles=[temp_file.name], 796 varslist=['var3=val3', 'var4=val4'], 797 command_args=['arg1', 'arg2=arg2val', 'arg3:arg3val']) 798 self._test._tests_to_run = ['test1', 'test2'] 799 800 self.maxDiff = None 801 self.assertDictEqual( 802 { 803 'var:var1': 'val1', 804 'var:var2': 'val2', 805 'var:var3': 'val3', 806 'var:var4': 'val4', 807 'tests:': 'test1\ntest2', 808 'test:test1': '', 809 'test:test2': '', 810 'args:': 'arg1\narg2=arg2val\narg3:arg3val', 811 'arg:arg1': 'arg1val', 812 'arg:arg1': '', 813 'arg:arg2': 'arg2val', 814 'arg:arg3': 'arg3val', 815 'labels:': 'plus\nboard:octopus\nfleex\nos:cros', 816 'label:plus': '', 817 'label:board': 'octopus', 818 'label:fleex': '', 819 'label:os': 'cros', 820 }, self._test._fill_config_extvars()) 821 822 def test_dut_server_arg(self): 823 # Test positive single case 824 cargs = [ 825 'dut_servers=100.101.102.103:1111,', 'servo_host=localhost', 826 'servo_port=9999' 827 ] 828 dut_server = tast._dut_server_arg(cargs) 829 self.assertEqual(dut_server, '100.101.102.103:1111') 830 831 # Test no dut_servers provided case: 832 cargs = ['servo_host=localhost', 'servo_port=9999'] 833 dut_server = tast._dut_server_arg(cargs) 834 self.assertEqual(dut_server, None) 835 836 # Test multiple dut_servers provided case: 837 cargs = [ 838 'dut_servers=100.101.102.103:1111,localhost:1234', 839 'servo_host=localhost', 'servo_port=9999' 840 ] 841 dut_server = tast._dut_server_arg(cargs) 842 self.assertEqual(dut_server, '100.101.102.103:1111') 843 844 845class TestInfo: 846 """Wraps information about a Tast test. 847 848 This struct is used to: 849 - get test definitions printed by fake_tast.py's 'list' command 850 - get test results written by fake_tast.py's 'run' command 851 - get expected base_job.status_log_entry objects that unit tests compare 852 against what tast.Tast actually recorded 853 """ 854 855 def __init__(self, 856 name, 857 start_offset, 858 end_offset, 859 errors=None, 860 skip_reason=None, 861 attr=None, 862 timeout_ns=0, 863 missing_reason=None, 864 report_skipped=False): 865 """ 866 @param name: Name of the test, e.g. 'ui.ChromeLogin'. 867 @param start_offset: Start time as int seconds offset from BASE_TIME, 868 or None to indicate that tast didn't report a result for this test. 869 @param end_offset: End time as int seconds offset from BASE_TIME, or 870 None to indicate that tast reported that this test started but not 871 that it finished. 872 @param errors: List of (string, int) tuples containing reasons and 873 seconds offsets of errors encountered while running the test, or 874 None if no errors were encountered. 875 @param skip_reason: Human-readable reason that the test was skipped, or 876 None to indicate that it wasn't skipped. 877 @param attr: List of string test attributes assigned to the test, or 878 None if no attributes are assigned. 879 @param timeout_ns: Test timeout in nanoseconds. 880 @param report_skipped: Decide if skipped tests should be reported 881 """ 882 def from_offset(offset): 883 """Returns an offset from BASE_TIME. 884 885 @param offset: Offset as integer seconds. 886 @return: datetime.datetime object. 887 """ 888 if offset is None: 889 return None 890 return BASE_TIME + datetime.timedelta(seconds=offset) 891 892 self._name = name 893 self._start_time = from_offset(start_offset) 894 self._end_time = from_offset(end_offset) 895 self._errors = ( 896 [(e[0], from_offset(e[1])) for e in errors] if errors else []) 897 self._skip_reason = skip_reason 898 self._attr = list(attr) if attr else [] 899 self._timeout_ns = timeout_ns 900 self._report_skipped = report_skipped 901 902 def name(self): 903 # pylint: disable=missing-docstring 904 return self._name 905 906 def start_time(self): 907 # pylint: disable=missing-docstring 908 return self._start_time 909 910 def test(self): 911 """Returns a test dict printed by the 'list' command. 912 913 @return: dict representing a Tast testing.Test struct. 914 """ 915 return { 916 'name': self._name, 917 'attr': self._attr, 918 'timeout': self._timeout_ns, 919 } 920 921 def test_result(self): 922 """Returns a dict representing a result written by the 'run' command. 923 924 @return: dict representing a Tast TestResult struct. 925 """ 926 return { 927 'name': self._name, 928 'start': to_rfc3339(self._start_time), 929 'end': to_rfc3339(self._end_time), 930 'errors': [{'reason': e[0], 'time': to_rfc3339(e[1])} 931 for e in self._errors], 932 'skipReason': self._skip_reason, 933 'attr': self._attr, 934 'timeout': self._timeout_ns, 935 } 936 937 def status_entries(self, run_error_msg=None): 938 """Returns expected base_job.status_log_entry objects for this test. 939 940 @param run_error_msg: String containing run error message, or None if no 941 run error was encountered. 942 @return: List of Autotest base_job.status_log_entry objects. 943 """ 944 # Deliberately-skipped tests shouldn't have status entries unless errors 945 # were also reported. 946 if not self._report_skipped and self._skip_reason and not self._errors: 947 return [] 948 949 def make(status_code, dt, msg=''): 950 """Makes a base_job.status_log_entry. 951 952 @param status_code: String status code. 953 @param dt: datetime.datetime object containing entry time, and its 954 value should be None if the test is not supposed to be started 955 @param msg: String message (typically only set for errors). 956 @return: base_job.status_log_entry object. 957 """ 958 timestamp = int( 959 (dt - tast._UNIX_EPOCH).total_seconds()) if dt else None 960 return base_job.status_log_entry( 961 status_code, None, 962 tast.tast._TEST_NAME_PREFIX + self._name, msg, None, 963 timestamp=timestamp) 964 965 entries = [make(tast.tast._JOB_STATUS_START, self._start_time)] 966 967 if not self._start_time: 968 if run_error_msg: 969 reason = '%s due to global error: %s' % ( 970 tast.tast._TEST_DID_NOT_RUN_MSG, run_error_msg) 971 else: 972 reason = tast.tast._TEST_DID_NOT_RUN_MSG 973 974 entries.append(make(tast.tast._JOB_STATUS_NOSTATUS, None, reason)) 975 entries.append(make(tast.tast._JOB_STATUS_END_NOSTATUS, None)) 976 elif self._end_time and self._skip_reason and not self._errors: 977 entries.append( 978 make(tast.tast._JOB_STATUS_SKIP, self._end_time, 979 self._skip_reason)) 980 entries.append(make(tast.tast._JOB_STATUS_END_SKIP, 981 self._end_time)) 982 elif self._end_time and not self._errors: 983 entries.append(make(tast.tast._JOB_STATUS_GOOD, self._end_time)) 984 entries.append(make(tast.tast._JOB_STATUS_END_GOOD, self._end_time)) 985 else: 986 for e in self._errors: 987 entries.append(make(tast.tast._JOB_STATUS_FAIL, e[1], e[0])) 988 if not self._end_time: 989 # If the test didn't finish, the run error (if any) should be 990 # included. 991 if run_error_msg: 992 entries.append(make(tast.tast._JOB_STATUS_FAIL, 993 self._start_time, run_error_msg)) 994 entries.append(make(tast.tast._JOB_STATUS_FAIL, 995 self._start_time, 996 tast.tast._TEST_DID_NOT_FINISH_MSG)) 997 entries.append(make(tast.tast._JOB_STATUS_END_FAIL, 998 self._end_time or self._start_time or NOW)) 999 1000 return entries 1001 1002 1003class FakeServerJob: 1004 """Fake implementation of server_job from server/server_job.py.""" 1005 def __init__(self, result_dir, tmp_dir): 1006 self.pkgmgr = None 1007 self.autodir = None 1008 self.resultdir = result_dir 1009 self.tmpdir = tmp_dir 1010 self.post_run_hook = None 1011 self.status_entries = [] 1012 1013 def add_post_run_hook(self, hook): 1014 """Stub implementation of server_job.add_post_run_hook.""" 1015 self.post_run_hook = hook 1016 1017 def record_entry(self, entry, log_in_subdir=True): 1018 """Stub implementation of server_job.record_entry.""" 1019 assert(not log_in_subdir) 1020 self.status_entries.append(entry) 1021 1022 1023class FakeHost: 1024 """Fake implementation of AbstractSSHHost from server/hosts/abstract_ssh.py. 1025 """ 1026 def __init__(self, hostname, port): 1027 self.hostname = hostname 1028 self.port = port 1029 self.host_info_store = host_info.InMemoryHostInfoStore(None) 1030 1031 1032class TastCommand(object): 1033 """Args and behavior for fake_tast.py for a given command, e.g. "list".""" 1034 1035 def __init__(self, required_args, status=0, stdout=None, stderr=None, 1036 files_to_write=None): 1037 """ 1038 @param required_args: List of required args, each specified as 1039 'name=value'. Names correspond to argparse-provided names in 1040 fake_tast.py (typically just the flag name, e.g. 'build' or 1041 'resultsdir'). Values correspond to str() representations of the 1042 argparse-provided values. 1043 @param status: Status code for fake_tast.py to return. 1044 @param stdout: Data to write to stdout. 1045 @param stderr: Data to write to stderr. 1046 @param files_to_write: Dict mapping from paths of files to write to 1047 their contents, or None to not write any files. 1048 """ 1049 self.required_args = required_args 1050 self.status = status 1051 self.stdout = stdout 1052 self.stderr = stderr 1053 self.files_to_write = files_to_write if files_to_write else {} 1054 1055 1056def to_rfc3339(t): 1057 """Returns an RFC3339 timestamp. 1058 1059 @param t: UTC datetime.datetime object or None for the zero time. 1060 @return: String RFC3339 time, e.g. '2018-01-02T02:34:28Z'. 1061 """ 1062 if t is None: 1063 return '0001-01-01T00:00:00Z' 1064 assert(not t.utcoffset()) 1065 return t.strftime('%Y-%m-%dT%H:%M:%SZ') 1066 1067 1068def get_status_entries_from_tests(tests, run_error_msg=None): 1069 """Returns a flattened list of status entries from TestInfo objects. 1070 1071 @param tests: List of TestInfo objects. 1072 @param run_error_msg: String containing run error message, or None if no 1073 run error was encountered. 1074 @return: Flattened list of base_job.status_log_entry objects produced by 1075 calling status_entries() on each TestInfo object. 1076 """ 1077 return sum([t.status_entries(run_error_msg) for t in tests], []) 1078 1079 1080def status_string(entries): 1081 """Returns a string describing a list of base_job.status_log_entry objects. 1082 1083 @param entries: List of base_job.status_log_entry objects. 1084 @return: String containing space-separated representations of entries. 1085 """ 1086 found_test_strings = [] 1087 missing_test_strings = [] 1088 1089 # For each missing test, there are three corresponding entries that we want 1090 # to put in missing_test_strings: "START", "NOSTATUS" and "END NOSTATUS". 1091 # We cannot tell if the test is missing in the "START" entry. Therefore, 1092 # we use missing_tests to keep track of all the missing tests. 1093 missing_tests = set(entry.operation for entry in entries 1094 if entry.status_code == tast.tast._JOB_STATUS_NOSTATUS) 1095 1096 for entry in entries: 1097 message = entry.message 1098 if isinstance(message, six.binary_type): 1099 message = message.decode('utf-8') 1100 # Ignore timestamp for missing entry 1101 timestamp = entry.fields[base_job.status_log_entry.TIMESTAMP_FIELD] 1102 if entry.operation not in missing_tests: 1103 s = '[%s %s %s %s]' % (timestamp, entry.operation, 1104 entry.status_code, repr(message)) 1105 found_test_strings.append(s) 1106 else: 1107 s = '[%s %s %s]' % (entry.operation, entry.status_code, 1108 repr(message)) 1109 missing_test_strings.append(s) 1110 return ' '.join(found_test_strings + missing_test_strings) 1111 1112 1113if __name__ == '__main__': 1114 unittest.main() 1115