1#!/usr/bin/python3 2# Copyright 2016 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9 10import six.moves.builtins 11import six.moves.queue 12import json 13import logging 14import os 15import shutil 16import signal 17import stat 18import subprocess 19import sys 20import tarfile 21import tempfile 22import time 23import unittest 24from unittest import mock 25 26import common 27 28from autotest_lib.client.common_lib import global_config 29from autotest_lib.client.common_lib import utils 30from autotest_lib.client.common_lib.test_utils.comparators import IsA 31 32#For unittest without cloud_client.proto compiled. 33try: 34 from autotest_lib.site_utils import cloud_console_client 35except ImportError: 36 cloud_console_client = None 37from autotest_lib.site_utils import gs_offloader 38from autotest_lib.site_utils import job_directories 39from autotest_lib.site_utils import job_directories_unittest as jd_test 40from autotest_lib.tko import models 41from autotest_lib.utils import gslib 42from autotest_lib.site_utils import pubsub_utils 43from autotest_lib.utils.frozen_chromite.lib import timeout_util 44from six.moves import range 45 46# Test value to use for `days_old`, if nothing else is required. 47_TEST_EXPIRATION_AGE = 7 48 49 50def _get_options(argv): 51 """Helper function to exercise command line parsing. 52 53 @param argv Value of sys.argv to be parsed. 54 55 """ 56 sys.argv = ['bogus.py'] + argv 57 return gs_offloader.parse_options() 58 59 60def is_fifo(path): 61 """Determines whether a path is a fifo. 62 63 @param path: fifo path string. 64 """ 65 return stat.S_ISFIFO(os.lstat(path).st_mode) 66 67 68def _get_fake_process(): 69 return FakeProcess() 70 71 72class FakeProcess(object): 73 """Fake process object.""" 74 75 def __init__(self): 76 self.returncode = 0 77 78 79 def wait(self): 80 return True 81 82 83class OffloaderOptionsTests(unittest.TestCase): 84 """Tests for the `Offloader` constructor. 85 86 Tests that offloader instance fields are set as expected 87 for given command line options. 88 89 """ 90 91 _REGULAR_ONLY = {job_directories.SwarmingJobDirectory, 92 job_directories.RegularJobDirectory} 93 _SPECIAL_ONLY = {job_directories.SwarmingJobDirectory, 94 job_directories.SpecialJobDirectory} 95 _BOTH = _REGULAR_ONLY | _SPECIAL_ONLY 96 97 98 def setUp(self): 99 super(OffloaderOptionsTests, self).setUp() 100 patcher = mock.patch.object(utils, 'get_offload_gsuri') 101 self.gsuri_patch = patcher.start() 102 self.addCleanup(patcher.stop) 103 104 gs_offloader.GS_OFFLOADING_ENABLED = True 105 gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False 106 107 108 def _mock_get_sub_offloader(self, is_moblab, multiprocessing=False, 109 console_client=None, delete_age=0): 110 """Mock the process of getting the offload_dir function.""" 111 if is_moblab: 112 self.expected_gsuri = '%sresults/%s/%s/' % ( 113 global_config.global_config.get_config_value( 114 'CROS', 'image_storage_server'), 115 'Fa:ke:ma:c0:12:34', 'rand0m-uu1d') 116 else: 117 self.expected_gsuri = utils.DEFAULT_OFFLOAD_GSURI 118 utils.get_offload_gsuri.return_value = self.expected_gsuri 119 sub_offloader = gs_offloader.GSOffloader(self.expected_gsuri, 120 multiprocessing, delete_age, 121 console_client) 122 123 GsOffloader_patcher = mock.patch.object(gs_offloader, 'GSOffloader') 124 self.GsOffloader_patch = GsOffloader_patcher.start() 125 self.addCleanup(GsOffloader_patcher.stop) 126 127 if cloud_console_client: 128 console_patcher = mock.patch.object( 129 cloud_console_client, 'is_cloud_notification_enabled') 130 self.ccc_notification_patch = console_patcher.start() 131 self.addCleanup(console_patcher.stop) 132 133 if console_client: 134 cloud_console_client.is_cloud_notification_enabled.return_value = True 135 gs_offloader.GSOffloader.return_value = sub_offloader 136 else: 137 if cloud_console_client: 138 cloud_console_client.is_cloud_notification_enabled.return_value = False 139 gs_offloader.GSOffloader.return_value = sub_offloader 140 141 return sub_offloader 142 143 def _verify_sub_offloader(self, 144 is_moblab, 145 multiprocessing=False, 146 console_client=None, 147 delete_age=0): 148 if console_client: 149 self.GsOffloader_patch.assert_called_with( 150 self.expected_gsuri, multiprocessing, delete_age, 151 IsA(cloud_console_client.PubSubBasedClient)) 152 153 else: 154 self.GsOffloader_patch.assert_called_with(self.expected_gsuri, 155 multiprocessing, 156 delete_age, None) 157 158 def test_process_no_options(self): 159 """Test default offloader options.""" 160 sub_offloader = self._mock_get_sub_offloader(False) 161 offloader = gs_offloader.Offloader(_get_options([])) 162 self.assertEqual(set(offloader._jobdir_classes), 163 self._REGULAR_ONLY) 164 self.assertEqual(offloader._processes, 1) 165 self.assertEqual(offloader._gs_offloader, 166 sub_offloader) 167 self.assertEqual(offloader._upload_age_limit, 0) 168 self.assertEqual(offloader._delete_age_limit, 0) 169 self._verify_sub_offloader(False) 170 171 def test_process_all_option(self): 172 """Test offloader handling for the --all option.""" 173 sub_offloader = self._mock_get_sub_offloader(False) 174 offloader = gs_offloader.Offloader(_get_options(['--all'])) 175 self.assertEqual(set(offloader._jobdir_classes), self._BOTH) 176 self.assertEqual(offloader._processes, 1) 177 self.assertEqual(offloader._gs_offloader, 178 sub_offloader) 179 self.assertEqual(offloader._upload_age_limit, 0) 180 self.assertEqual(offloader._delete_age_limit, 0) 181 self._verify_sub_offloader(False) 182 183 184 def test_process_hosts_option(self): 185 """Test offloader handling for the --hosts option.""" 186 sub_offloader = self._mock_get_sub_offloader(False) 187 offloader = gs_offloader.Offloader( 188 _get_options(['--hosts'])) 189 self.assertEqual(set(offloader._jobdir_classes), 190 self._SPECIAL_ONLY) 191 self.assertEqual(offloader._processes, 1) 192 self.assertEqual(offloader._gs_offloader, 193 sub_offloader) 194 self.assertEqual(offloader._upload_age_limit, 0) 195 self.assertEqual(offloader._delete_age_limit, 0) 196 self._verify_sub_offloader(False) 197 198 199 def test_parallelism_option(self): 200 """Test offloader handling for the --parallelism option.""" 201 sub_offloader = self._mock_get_sub_offloader(False) 202 offloader = gs_offloader.Offloader( 203 _get_options(['--parallelism', '2'])) 204 self.assertEqual(set(offloader._jobdir_classes), 205 self._REGULAR_ONLY) 206 self.assertEqual(offloader._processes, 2) 207 self.assertEqual(offloader._gs_offloader, 208 sub_offloader) 209 self.assertEqual(offloader._upload_age_limit, 0) 210 self.assertEqual(offloader._delete_age_limit, 0) 211 self._verify_sub_offloader(False) 212 213 214 def test_delete_only_option(self): 215 """Test offloader handling for the --delete_only option.""" 216 offloader = gs_offloader.Offloader( 217 _get_options(['--delete_only'])) 218 self.assertEqual(set(offloader._jobdir_classes), 219 self._REGULAR_ONLY) 220 self.assertEqual(offloader._processes, 1) 221 self.assertIsInstance(offloader._gs_offloader, 222 gs_offloader.FakeGSOffloader) 223 self.assertEqual(offloader._upload_age_limit, 0) 224 self.assertEqual(offloader._delete_age_limit, 0) 225 226 227 def test_days_old_option(self): 228 """Test offloader handling for the --days_old option.""" 229 sub_offloader = self._mock_get_sub_offloader(False, delete_age=7) 230 offloader = gs_offloader.Offloader( 231 _get_options(['--days_old', '7'])) 232 self.assertEqual(set(offloader._jobdir_classes), 233 self._REGULAR_ONLY) 234 self.assertEqual(offloader._processes, 1) 235 self.assertEqual(offloader._gs_offloader, 236 sub_offloader) 237 self.assertEqual(offloader._upload_age_limit, 7) 238 self.assertEqual(offloader._delete_age_limit, 7) 239 self._verify_sub_offloader(False, delete_age=7) 240 241 242 def test_moblab_gsuri_generation(self): 243 """Test offloader construction for Moblab.""" 244 sub_offloader = self._mock_get_sub_offloader(True) 245 offloader = gs_offloader.Offloader(_get_options([])) 246 self.assertEqual(set(offloader._jobdir_classes), 247 self._REGULAR_ONLY) 248 self.assertEqual(offloader._processes, 1) 249 self.assertEqual(offloader._gs_offloader, 250 sub_offloader) 251 self.assertEqual(offloader._upload_age_limit, 0) 252 self.assertEqual(offloader._delete_age_limit, 0) 253 self._verify_sub_offloader(True) 254 255 256 def test_globalconfig_offloading_flag(self): 257 """Test enabling of --delete_only via global_config.""" 258 gs_offloader.GS_OFFLOADING_ENABLED = False 259 offloader = gs_offloader.Offloader( 260 _get_options([])) 261 self.assertIsInstance(offloader._gs_offloader, 262 gs_offloader.FakeGSOffloader) 263 264 def test_offloader_multiprocessing_flag_set(self): 265 """Test multiprocessing is set.""" 266 sub_offloader = self._mock_get_sub_offloader(True, True) 267 offloader = gs_offloader.Offloader(_get_options(['-m'])) 268 self.assertEqual(offloader._gs_offloader, 269 sub_offloader) 270 self._verify_sub_offloader(True, True) 271 272 def test_offloader_multiprocessing_flag_not_set_default_false(self): 273 """Test multiprocessing is set.""" 274 gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False 275 sub_offloader = self._mock_get_sub_offloader(True, False) 276 offloader = gs_offloader.Offloader(_get_options([])) 277 self.assertEqual(offloader._gs_offloader, 278 sub_offloader) 279 self._verify_sub_offloader(True, False) 280 281 def test_offloader_multiprocessing_flag_not_set_default_true(self): 282 """Test multiprocessing is set.""" 283 gs_offloader.GS_OFFLOADER_MULTIPROCESSING = True 284 sub_offloader = self._mock_get_sub_offloader(True, True) 285 offloader = gs_offloader.Offloader(_get_options([])) 286 self.assertEqual(offloader._gs_offloader, 287 sub_offloader) 288 self._verify_sub_offloader(True, True) 289 290 291 def test_offloader_pubsub_enabled(self): 292 """Test multiprocessing is set.""" 293 if not cloud_console_client: 294 return 295 with mock.patch.object(pubsub_utils, "PubSubClient"): 296 sub_offloader = self._mock_get_sub_offloader( 297 True, False, cloud_console_client.PubSubBasedClient()) 298 offloader = gs_offloader.Offloader(_get_options([])) 299 self.assertEqual(offloader._gs_offloader, sub_offloader) 300 self._verify_sub_offloader( 301 True, False, cloud_console_client.PubSubBasedClient()) 302 303 304class _MockJobDirectory(job_directories._JobDirectory): 305 """Subclass of `_JobDirectory` used as a helper for tests.""" 306 307 GLOB_PATTERN = '[0-9]*-*' 308 309 310 def __init__(self, resultsdir): 311 """Create new job in initial state.""" 312 super(_MockJobDirectory, self).__init__(resultsdir) 313 self._timestamp = None 314 self.queue_args = [resultsdir, os.path.dirname(resultsdir), self._timestamp] 315 316 317 def get_timestamp_if_finished(self): 318 return self._timestamp 319 320 321 def set_finished(self, days_old): 322 """Make this job appear to be finished. 323 324 After calling this function, calls to `enqueue_offload()` 325 will find this job as finished, but not expired and ready 326 for offload. Note that when `days_old` is 0, 327 `enqueue_offload()` will treat a finished job as eligible 328 for offload. 329 330 @param days_old The value of the `days_old` parameter that 331 will be passed to `enqueue_offload()` for 332 testing. 333 334 """ 335 self._timestamp = jd_test.make_timestamp(days_old, False) 336 self.queue_args[2] = self._timestamp 337 338 339 def set_expired(self, days_old): 340 """Make this job eligible to be offloaded. 341 342 After calling this function, calls to `offload` will attempt 343 to offload this job. 344 345 @param days_old The value of the `days_old` parameter that 346 will be passed to `enqueue_offload()` for 347 testing. 348 349 """ 350 self._timestamp = jd_test.make_timestamp(days_old, True) 351 self.queue_args[2] = self._timestamp 352 353 354 def set_incomplete(self): 355 """Make this job appear to have failed offload just once.""" 356 self.offload_count += 1 357 self.first_offload_start = time.time() 358 if not os.path.isdir(self.dirname): 359 os.mkdir(self.dirname) 360 361 362 def set_reportable(self): 363 """Make this job be reportable.""" 364 self.set_incomplete() 365 self.offload_count += 1 366 367 368 def set_complete(self): 369 """Make this job be completed.""" 370 self.offload_count += 1 371 if os.path.isdir(self.dirname): 372 os.rmdir(self.dirname) 373 374 375 def process_gs_instructions(self): 376 """Always still offload the job directory.""" 377 return True 378 379 380class CommandListTests(unittest.TestCase): 381 """Tests for `_get_cmd_list()`.""" 382 383 def _command_list_assertions(self, job, use_rsync=True, multi=False): 384 """Call `_get_cmd_list()` and check the return value. 385 386 Check the following assertions: 387 * The command name (argv[0]) is 'gsutil'. 388 * '-m' option (argv[1]) is on when the argument, multi, is True. 389 * The arguments contain the 'cp' subcommand. 390 * The next-to-last argument (the source directory) is the 391 job's `queue_args[0]`. 392 * The last argument (the destination URL) is the job's 393 'queue_args[1]'. 394 395 @param job A job with properly calculated arguments to 396 `_get_cmd_list()` 397 @param use_rsync True when using 'rsync'. False when using 'cp'. 398 @param multi True when using '-m' option for gsutil. 399 400 """ 401 test_bucket_uri = 'gs://a-test-bucket' 402 403 gs_offloader.USE_RSYNC_ENABLED = use_rsync 404 405 gs_path = os.path.join(test_bucket_uri, job.queue_args[1]) 406 407 command = gs_offloader._get_cmd_list( 408 multi, job.queue_args[0], gs_path) 409 410 self.assertEqual(command[0], 'gsutil') 411 if multi: 412 self.assertEqual(command[1], '-m') 413 self.assertEqual(command[-2], job.queue_args[0]) 414 415 if use_rsync: 416 self.assertTrue('rsync' in command) 417 self.assertEqual(command[-1], 418 os.path.join(test_bucket_uri, job.queue_args[0])) 419 else: 420 self.assertTrue('cp' in command) 421 self.assertEqual(command[-1], 422 os.path.join(test_bucket_uri, job.queue_args[1])) 423 424 finish_command = gs_offloader._get_finish_cmd_list(gs_path) 425 self.assertEqual(finish_command[0], 'gsutil') 426 self.assertEqual(finish_command[1], 'cp') 427 self.assertEqual(finish_command[2], '/dev/null') 428 self.assertEqual(finish_command[3], 429 os.path.join(gs_path, '.finished_offload')) 430 431 432 def test__get_cmd_list_regular(self): 433 """Test `_get_cmd_list()` as for a regular job.""" 434 job = _MockJobDirectory('118-debug') 435 self._command_list_assertions(job) 436 437 438 def test__get_cmd_list_special(self): 439 """Test `_get_cmd_list()` as for a special job.""" 440 job = _MockJobDirectory('hosts/host1/118-reset') 441 self._command_list_assertions(job) 442 443 444 def test_get_cmd_list_regular_no_rsync(self): 445 """Test `_get_cmd_list()` as for a regular job.""" 446 job = _MockJobDirectory('118-debug') 447 self._command_list_assertions(job, use_rsync=False) 448 449 450 def test_get_cmd_list_special_no_rsync(self): 451 """Test `_get_cmd_list()` as for a special job.""" 452 job = _MockJobDirectory('hosts/host1/118-reset') 453 self._command_list_assertions(job, use_rsync=False) 454 455 456 def test_get_cmd_list_regular_multi(self): 457 """Test `_get_cmd_list()` as for a regular job with True multi.""" 458 job = _MockJobDirectory('118-debug') 459 self._command_list_assertions(job, multi=True) 460 461 462 def test__get_cmd_list_special_multi(self): 463 """Test `_get_cmd_list()` as for a special job with True multi.""" 464 job = _MockJobDirectory('hosts/host1/118-reset') 465 self._command_list_assertions(job, multi=True) 466 467 468class _TempResultsDirTestCase(unittest.TestCase): 469 """Mixin class for tests using a temporary results directory.""" 470 471 REGULAR_JOBLIST = [ 472 '111-fubar', '112-fubar', '113-fubar', '114-snafu'] 473 HOST_LIST = ['host1', 'host2', 'host3'] 474 SPECIAL_JOBLIST = [ 475 'hosts/host1/333-reset', 'hosts/host1/334-reset', 476 'hosts/host2/444-reset', 'hosts/host3/555-reset'] 477 478 479 def setUp(self): 480 super(_TempResultsDirTestCase, self).setUp() 481 self._resultsroot = tempfile.mkdtemp() 482 self._cwd = os.getcwd() 483 os.chdir(self._resultsroot) 484 485 486 def tearDown(self): 487 os.chdir(self._cwd) 488 shutil.rmtree(self._resultsroot) 489 super(_TempResultsDirTestCase, self).tearDown() 490 491 492 def make_job(self, jobdir): 493 """Create a job with results in `self._resultsroot`. 494 495 @param jobdir Name of the subdirectory to be created in 496 `self._resultsroot`. 497 498 """ 499 os.makedirs(jobdir) 500 return _MockJobDirectory(jobdir) 501 502 503 def make_job_hierarchy(self): 504 """Create a sample hierarchy of job directories. 505 506 `self.REGULAR_JOBLIST` is a list of directories for regular 507 jobs to be created; `self.SPECIAL_JOBLIST` is a list of 508 directories for special jobs to be created. 509 510 """ 511 for d in self.REGULAR_JOBLIST: 512 os.mkdir(d) 513 hostsdir = 'hosts' 514 os.mkdir(hostsdir) 515 for host in self.HOST_LIST: 516 os.mkdir(os.path.join(hostsdir, host)) 517 for d in self.SPECIAL_JOBLIST: 518 os.mkdir(d) 519 520 521class _TempResultsDirTestBase(_TempResultsDirTestCase, unittest.TestCase): 522 """Base test class for tests using a temporary results directory.""" 523 524 525class FailedOffloadsLogTest(_TempResultsDirTestBase): 526 """Test the formatting of failed offloads log file.""" 527 # Below is partial sample of a failed offload log file. This text is 528 # deliberately hard-coded and then parsed to create the test data; the idea 529 # is to make sure the actual text format will be reviewed by a human being. 530 # 531 # first offload count directory 532 # --+----1----+---- ----+ ----+----1----+----2----+----3 533 _SAMPLE_DIRECTORIES_REPORT = '''\ 534 =================== ====== ============================== 535 2014-03-14 15:09:26 1 118-fubar 536 2014-03-14 15:19:23 2 117-fubar 537 2014-03-14 15:29:20 6 116-fubar 538 2014-03-14 15:39:17 24 115-fubar 539 2014-03-14 15:49:14 120 114-fubar 540 2014-03-14 15:59:11 720 113-fubar 541 2014-03-14 16:09:08 5040 112-fubar 542 2014-03-14 16:19:05 40320 111-fubar 543 ''' 544 545 def setUp(self): 546 super(FailedOffloadsLogTest, self).setUp() 547 self._offloader = gs_offloader.Offloader(_get_options([])) 548 self._joblist = [] 549 for line in self._SAMPLE_DIRECTORIES_REPORT.split('\n')[1 : -1]: 550 date_, time_, count, dir_ = line.split() 551 job = _MockJobDirectory(dir_) 552 job.offload_count = int(count) 553 timestruct = time.strptime("%s %s" % (date_, time_), 554 gs_offloader.FAILED_OFFLOADS_TIME_FORMAT) 555 job.first_offload_start = time.mktime(timestruct) 556 # enter the jobs in reverse order, to make sure we 557 # test that the output will be sorted. 558 self._joblist.insert(0, job) 559 560 561 def assert_report_well_formatted(self, report_file): 562 """Assert that report file is well formatted. 563 564 @param report_file: Path to report file 565 """ 566 with open(report_file, 'r') as f: 567 report_lines = f.read().split() 568 569 for end_of_header_index in range(len(report_lines)): 570 if report_lines[end_of_header_index].startswith('=='): 571 break 572 self.assertLess(end_of_header_index, len(report_lines), 573 'Failed to find end-of-header marker in the report') 574 575 relevant_lines = report_lines[end_of_header_index:] 576 expected_lines = self._SAMPLE_DIRECTORIES_REPORT.split() 577 self.assertListEqual(relevant_lines, expected_lines) 578 579 580 def test_failed_offload_log_format(self): 581 """Trigger an e-mail report and check its contents.""" 582 log_file = os.path.join(self._resultsroot, 'failed_log') 583 report = self._offloader._log_failed_jobs_locally(self._joblist, 584 log_file=log_file) 585 self.assert_report_well_formatted(log_file) 586 587 588 def test_failed_offload_file_overwrite(self): 589 """Verify that we can saefly overwrite the log file.""" 590 log_file = os.path.join(self._resultsroot, 'failed_log') 591 with open(log_file, 'w') as f: 592 f.write('boohoohoo') 593 report = self._offloader._log_failed_jobs_locally(self._joblist, 594 log_file=log_file) 595 self.assert_report_well_formatted(log_file) 596 597 598class OffloadDirectoryTests(_TempResultsDirTestBase): 599 """Tests for `offload_dir()`.""" 600 601 def setUp(self): 602 super(OffloadDirectoryTests, self).setUp() 603 # offload_dir() logs messages; silence them. 604 self._saved_loglevel = logging.getLogger().getEffectiveLevel() 605 logging.getLogger().setLevel(logging.CRITICAL+1) 606 self._job = self.make_job(self.REGULAR_JOBLIST[0]) 607 608 cmd_list_patcher = mock.patch.object(gs_offloader, '_get_cmd_list') 609 cmd_list_patch = cmd_list_patcher.start() 610 self.addCleanup(cmd_list_patcher.stop) 611 612 alarm = mock.patch('signal.alarm', return_value=0) 613 alarm.start() 614 self.addCleanup(alarm.stop) 615 616 cmd_list_patcher = mock.patch.object(models.test, 'parse_job_keyval') 617 cmd_list_patch = cmd_list_patcher.start() 618 self.addCleanup(cmd_list_patcher.stop) 619 620 self.should_remove_sarming_req_dir = False 621 622 623 def tearDown(self): 624 logging.getLogger().setLevel(self._saved_loglevel) 625 super(OffloadDirectoryTests, self).tearDown() 626 627 def _mock_create_marker_file(self): 628 open_patcher = mock.patch.object(six.moves.builtins, 'open') 629 open_patch = open_patcher.start() 630 self.addCleanup(open_patcher.stop) 631 632 open.return_value = mock.MagicMock() 633 634 635 def _mock_offload_dir_calls(self, command, queue_args, 636 marker_initially_exists=False): 637 """Mock out the calls needed by `offload_dir()`. 638 639 This covers only the calls made when there is no timeout. 640 641 @param command Command list to be returned by the mocked 642 call to `_get_cmd_list()`. 643 644 """ 645 isfile_patcher = mock.patch.object(os.path, 'isfile') 646 isfile_patcher.start() 647 self.addCleanup(isfile_patcher.stop) 648 649 os.path.isfile.return_value = marker_initially_exists 650 command.append(queue_args[0]) 651 gs_offloader._get_cmd_list( 652 False, queue_args[0], 653 '%s%s' % (utils.DEFAULT_OFFLOAD_GSURI, 654 queue_args[1])).AndReturn(command) 655 656 657 def _run_offload_dir(self, should_succeed, delete_age): 658 """Make one call to `offload_dir()`. 659 660 The caller ensures all mocks are set up already. 661 662 @param should_succeed True iff the call to `offload_dir()` 663 is expected to succeed and remove the 664 offloaded job directory. 665 666 """ 667 gs_offloader.GSOffloader( 668 utils.DEFAULT_OFFLOAD_GSURI, False, delete_age).offload( 669 self._job.queue_args[0], 670 self._job.queue_args[1], 671 self._job.queue_args[2]) 672 self.assertEqual(not should_succeed, 673 os.path.isdir(self._job.queue_args[0])) 674 swarming_req_dir = gs_offloader._get_swarming_req_dir( 675 self._job.queue_args[0]) 676 if swarming_req_dir: 677 self.assertEqual(not self.should_remove_sarming_req_dir, 678 os.path.exists(swarming_req_dir)) 679 680 681 def test_offload_success(self): 682 """Test that `offload_dir()` can succeed correctly.""" 683 self._mock_offload_dir_calls(['test', '-d'], 684 self._job.queue_args) 685 os.path.isfile.return_value = True 686 self._mock_create_marker_file() 687 self._run_offload_dir(True, 0) 688 689 690 def test_offload_failure(self): 691 """Test that `offload_dir()` can fail correctly.""" 692 self._mock_offload_dir_calls(['test', '!', '-d'], 693 self._job.queue_args) 694 self._run_offload_dir(False, 0) 695 696 697 def test_offload_swarming_req_dir_remove(self): 698 """Test that `offload_dir()` can prune the empty swarming task dir.""" 699 should_remove = os.path.join('results', 'swarming-123abc0') 700 self._job = self.make_job(os.path.join(should_remove, '1')) 701 self._mock_offload_dir_calls(['test', '-d'], 702 self._job.queue_args) 703 704 os.path.isfile.return_value = True 705 self.should_remove_sarming_req_dir = True 706 self._mock_create_marker_file() 707 self._run_offload_dir(True, 0) 708 709 710 def test_offload_swarming_req_dir_exist(self): 711 """Test that `offload_dir()` keeps the non-empty swarming task dir.""" 712 should_not_remove = os.path.join('results', 'swarming-456edf0') 713 self._job = self.make_job(os.path.join(should_not_remove, '1')) 714 self.make_job(os.path.join(should_not_remove, '2')) 715 self._mock_offload_dir_calls(['test', '-d'], 716 self._job.queue_args) 717 718 os.path.isfile.return_value = True 719 self.should_remove_sarming_req_dir = False 720 self._mock_create_marker_file() 721 self._run_offload_dir(True, 0) 722 723 724 def test_sanitize_dir(self): 725 """Test that folder/file name with invalid character can be corrected. 726 """ 727 results_folder = tempfile.mkdtemp() 728 invalid_chars = '_'.join(['[', ']', '*', '?', '#']) 729 invalid_files = [] 730 invalid_folder_name = 'invalid_name_folder_%s' % invalid_chars 731 invalid_folder = os.path.join( 732 results_folder, 733 invalid_folder_name) 734 invalid_files.append(os.path.join( 735 invalid_folder, 736 'invalid_name_file_%s' % invalid_chars)) 737 good_folder = os.path.join(results_folder, 'valid_name_folder') 738 good_file = os.path.join(good_folder, 'valid_name_file') 739 for folder in [invalid_folder, good_folder]: 740 os.makedirs(folder) 741 for f in invalid_files + [good_file]: 742 with open(f, 'w'): 743 pass 744 # check that broken symlinks don't break sanitization 745 symlink = os.path.join(invalid_folder, 'broken-link') 746 os.symlink(os.path.join(results_folder, 'no-such-file'), 747 symlink) 748 fifo1 = os.path.join(results_folder, 'test_fifo1') 749 fifo2 = os.path.join(good_folder, 'test_fifo2') 750 fifo3 = os.path.join(invalid_folder, 'test_fifo3') 751 invalid_fifo4_name = 'test_fifo4_%s' % invalid_chars 752 fifo4 = os.path.join(invalid_folder, invalid_fifo4_name) 753 os.mkfifo(fifo1) 754 os.mkfifo(fifo2) 755 os.mkfifo(fifo3) 756 os.mkfifo(fifo4) 757 gs_offloader.sanitize_dir(results_folder) 758 for _, dirs, files in os.walk(results_folder): 759 for name in dirs + files: 760 self.assertEqual(name, gslib.escape(name)) 761 for c in name: 762 self.assertFalse(c in ['[', ']', '*', '?', '#']) 763 self.assertTrue(os.path.exists(good_file)) 764 765 self.assertTrue(os.path.exists(fifo1)) 766 self.assertFalse(is_fifo(fifo1)) 767 self.assertTrue(os.path.exists(fifo2)) 768 self.assertFalse(is_fifo(fifo2)) 769 corrected_folder = os.path.join( 770 results_folder, gslib.escape(invalid_folder_name)) 771 corrected_fifo3 = os.path.join( 772 corrected_folder, 773 'test_fifo3') 774 self.assertFalse(os.path.exists(fifo3)) 775 self.assertTrue(os.path.exists(corrected_fifo3)) 776 self.assertFalse(is_fifo(corrected_fifo3)) 777 corrected_fifo4 = os.path.join( 778 corrected_folder, gslib.escape(invalid_fifo4_name)) 779 self.assertFalse(os.path.exists(fifo4)) 780 self.assertTrue(os.path.exists(corrected_fifo4)) 781 self.assertFalse(is_fifo(corrected_fifo4)) 782 783 corrected_symlink = os.path.join( 784 corrected_folder, 785 'broken-link') 786 self.assertFalse(os.path.lexists(symlink)) 787 self.assertTrue(os.path.exists(corrected_symlink)) 788 self.assertFalse(os.path.islink(corrected_symlink)) 789 shutil.rmtree(results_folder) 790 791 792 def check_limit_file_count(self, is_test_job=True): 793 """Test that folder with too many files can be compressed. 794 795 @param is_test_job: True to check the method with test job result 796 folder. Set to False for special task folder. 797 """ 798 results_folder = tempfile.mkdtemp() 799 host_folder = os.path.join( 800 results_folder, 801 'lab1-host1' if is_test_job else 'hosts/lab1-host1/1-repair') 802 debug_folder = os.path.join(host_folder, 'debug') 803 sysinfo_folder = os.path.join(host_folder, 'sysinfo') 804 for folder in [debug_folder, sysinfo_folder]: 805 os.makedirs(folder) 806 for i in range(10): 807 with open(os.path.join(folder, str(i)), 'w') as f: 808 f.write('test') 809 810 gs_offloader._MAX_FILE_COUNT = 100 811 gs_offloader.limit_file_count( 812 results_folder if is_test_job else host_folder) 813 self.assertTrue(os.path.exists(sysinfo_folder)) 814 815 gs_offloader._MAX_FILE_COUNT = 10 816 gs_offloader.limit_file_count( 817 results_folder if is_test_job else host_folder) 818 self.assertFalse(os.path.exists(sysinfo_folder)) 819 self.assertTrue(os.path.exists(sysinfo_folder + '.tgz')) 820 self.assertTrue(os.path.exists(debug_folder)) 821 822 shutil.rmtree(results_folder) 823 824 825 def test_limit_file_count(self): 826 """Test that folder with too many files can be compressed. 827 """ 828 self.check_limit_file_count(is_test_job=True) 829 self.check_limit_file_count(is_test_job=False) 830 831 832 def test_get_metrics_fields(self): 833 """Test method _get_metrics_fields.""" 834 results_folder, host_folder = self._create_results_folder() 835 models.test.parse_job_keyval.return_value = ({ 836 'build': 'veyron_minnie-cheets-release/R52-8248.0.0', 837 'parent_job_id': 'p_id', 838 'suite': 'arc-cts' 839 }) 840 try: 841 self.assertEqual({'board': 'veyron_minnie-cheets', 842 'milestone': 'R52'}, 843 gs_offloader._get_metrics_fields(host_folder)) 844 finally: 845 shutil.rmtree(results_folder) 846 847 848 def _create_results_folder(self): 849 results_folder = tempfile.mkdtemp() 850 host_folder = os.path.join(results_folder, 'chromeos4-row9-rack11-host22') 851 852 # Build host keyvals set to parse model info. 853 host_info_path = os.path.join(host_folder, 'host_keyvals') 854 dir_to_create = '/' 855 for tdir in host_info_path.split('/'): 856 dir_to_create = os.path.join(dir_to_create, tdir) 857 if not os.path.exists(dir_to_create): 858 os.mkdir(dir_to_create) 859 with open(os.path.join(host_info_path, 'chromeos4-row9-rack11-host22'), 'w') as store_file: 860 store_file.write('labels=board%3Acoral,hw_video_acc_vp9,cros,'+ 861 'hw_jpeg_acc_dec,bluetooth,model%3Arobo360,'+ 862 'accel%3Acros-ec,'+ 863 'sku%3Arobo360_IntelR_CeleronR_CPU_N3450_1_10GHz_4Gb') 864 865 # .autoserv_execute file is needed for the test results package to look 866 # legit. 867 autoserve_path = os.path.join(host_folder, '.autoserv_execute') 868 with open(autoserve_path, 'w') as temp_file: 869 temp_file.write(' ') 870 871 return (results_folder, host_folder) 872 873 874class OffladerConfigTests(_TempResultsDirTestBase): 875 """Tests for the `Offloader` to follow side_effect config.""" 876 877 def setUp(self): 878 super(OffladerConfigTests, self).setUp() 879 gs_offloader.GS_OFFLOADING_ENABLED = True 880 gs_offloader.GS_OFFLOADER_MULTIPROCESSING = True 881 self.dest_path = '/results' 882 883 metrics_fields_patcher = mock.patch.object(gs_offloader, 884 '_get_metrics_fields') 885 metrics_fields_patcher.start() 886 self.addCleanup(metrics_fields_patcher.stop) 887 888 offloadError_patcher = mock.patch.object(gs_offloader, '_OffloadError') 889 offloadError_patcher.start() 890 self.addCleanup(offloadError_patcher.stop) 891 892 offload_metrics_patcher = mock.patch.object(gs_offloader, 893 '_emit_offload_metrics') 894 offload_metrics_patcher.start() 895 self.addCleanup(offload_metrics_patcher.stop) 896 897 cmd_list_patcher = mock.patch.object(gs_offloader, '_get_cmd_list') 898 cmd_list_patcher.start() 899 self.addCleanup(cmd_list_patcher.stop) 900 901 Popen_patcher = mock.patch.object(subprocess, 'Popen') 902 Popen_patcher.start() 903 self.addCleanup(Popen_patcher.stop) 904 905 returncode_metric_patcher = mock.patch.object( 906 gs_offloader, '_emit_gs_returncode_metric') 907 returncode_metric_patcher.start() 908 self.addCleanup(returncode_metric_patcher.stop) 909 910 def _run(self, results_dir, gs_bucket, expect_dest): 911 stdout = os.path.join(results_dir, 'std.log') 912 stderr = os.path.join(results_dir, 'std.err') 913 config = { 914 'tko': { 915 'proxy_socket': '/file-system/foo-socket', 916 'mysql_user': 'foo-user', 917 'mysql_password_file': '/file-system/foo-password-file' 918 }, 919 'google_storage': { 920 'bucket': gs_bucket, 921 'credentials_file': '/foo-creds' 922 }, 923 'this_field_is_ignored': True 924 } 925 path = os.path.join(results_dir, 'side_effects_config.json') 926 with open(path, 'w') as f: 927 f.write(json.dumps(config)) 928 gs_offloader._get_metrics_fields(results_dir) 929 gs_offloader._get_cmd_list.return_value = ['test', '-d', expect_dest] 930 subprocess.Popen.side_effect = [ 931 _get_fake_process(), _get_fake_process() 932 ] 933 gs_offloader._OffloadError(mock.ANY) 934 gs_offloader._emit_gs_returncode_metric.return_value = True 935 gs_offloader._emit_offload_metrics.return_value = True 936 sub_offloader = gs_offloader.GSOffloader(results_dir, True, 0, None) 937 sub_offloader._try_offload(results_dir, self.dest_path, stdout, stderr) 938 shutil.rmtree(results_dir) 939 940 def _verify(self, results_dir, gs_bucket, expect_dest): 941 gs_offloader._get_cmd_list.assert_called_with(True, mock.ANY, 942 expect_dest) 943 944 stdout = os.path.join(results_dir, 'std.log') 945 stderr = os.path.join(results_dir, 'std.err') 946 947 # x2 948 subprocess_call = mock.call(mock.ANY, stdout=stdout, stderr=stderr) 949 subprocess.Popen.assert_has_calls([subprocess_call, subprocess_call]) 950 951 def test_skip_gs_prefix(self): 952 """Test skip the 'gs://' prefix if already presented.""" 953 res = tempfile.mkdtemp() 954 gs_bucket = 'gs://prod-bucket' 955 expect_dest = gs_bucket + self.dest_path 956 self._run(res, gs_bucket, expect_dest) 957 self._verify(res, gs_bucket, expect_dest) 958 959 960class JobDirectoryOffloadTests(_TempResultsDirTestBase): 961 """Tests for `_JobDirectory.enqueue_offload()`. 962 963 When testing with a `days_old` parameter of 0, we use 964 `set_finished()` instead of `set_expired()`. This causes the 965 job's timestamp to be set in the future. This is done so as 966 to test that when `days_old` is 0, the job is always treated 967 as eligible for offload, regardless of the timestamp's value. 968 969 Testing covers the following assertions: 970 A. Each time `enqueue_offload()` is called, a message that 971 includes the job's directory name will be logged using 972 `logging.debug()`, regardless of whether the job was 973 enqueued. Nothing else is allowed to be logged. 974 B. If the job is not eligible to be offloaded, 975 `first_offload_start` and `offload_count` are 0. 976 C. If the job is not eligible for offload, nothing is 977 enqueued in `queue`. 978 D. When the job is offloaded, `offload_count` increments 979 each time. 980 E. When the job is offloaded, the appropriate parameters are 981 enqueued exactly once. 982 F. The first time a job is offloaded, `first_offload_start` is 983 set to the current time. 984 G. `first_offload_start` only changes the first time that the 985 job is offloaded. 986 987 The test cases below are designed to exercise all of the 988 meaningful state transitions at least once. 989 990 """ 991 992 def setUp(self): 993 super(JobDirectoryOffloadTests, self).setUp() 994 self._job = self.make_job(self.REGULAR_JOBLIST[0]) 995 self._queue = six.moves.queue.Queue() 996 997 998 def _offload_unexpired_job(self, days_old): 999 """Make calls to `enqueue_offload()` for an unexpired job. 1000 1001 This method tests assertions B and C that calling 1002 `enqueue_offload()` has no effect. 1003 1004 """ 1005 self.assertEqual(self._job.offload_count, 0) 1006 self.assertEqual(self._job.first_offload_start, 0) 1007 gs_offloader._enqueue_offload(self._job, self._queue, days_old) 1008 gs_offloader._enqueue_offload(self._job, self._queue, days_old) 1009 self.assertTrue(self._queue.empty()) 1010 self.assertEqual(self._job.offload_count, 0) 1011 self.assertEqual(self._job.first_offload_start, 0) 1012 1013 1014 def _offload_expired_once(self, days_old, count): 1015 """Make one call to `enqueue_offload()` for an expired job. 1016 1017 This method tests assertions D and E regarding side-effects 1018 expected when a job is offloaded. 1019 1020 """ 1021 gs_offloader._enqueue_offload(self._job, self._queue, days_old) 1022 self.assertEqual(self._job.offload_count, count) 1023 self.assertFalse(self._queue.empty()) 1024 v = self._queue.get_nowait() 1025 self.assertTrue(self._queue.empty()) 1026 self.assertEqual(v, self._job.queue_args) 1027 1028 1029 def _offload_expired_job(self, days_old): 1030 """Make calls to `enqueue_offload()` for a just-expired job. 1031 1032 This method directly tests assertions F and G regarding 1033 side-effects on `first_offload_start`. 1034 1035 """ 1036 t0 = time.time() 1037 self._offload_expired_once(days_old, 1) 1038 t1 = self._job.first_offload_start 1039 self.assertLessEqual(t1, time.time()) 1040 self.assertGreaterEqual(t1, t0) 1041 self._offload_expired_once(days_old, 2) 1042 self.assertEqual(self._job.first_offload_start, t1) 1043 self._offload_expired_once(days_old, 3) 1044 self.assertEqual(self._job.first_offload_start, t1) 1045 1046 1047 def test_case_1_no_expiration(self): 1048 """Test a series of `enqueue_offload()` calls with `days_old` of 0. 1049 1050 This tests that offload works as expected if calls are 1051 made both before and after the job becomes expired. 1052 1053 """ 1054 self._offload_unexpired_job(0) 1055 self._job.set_finished(0) 1056 self._offload_expired_job(0) 1057 1058 1059 def test_case_2_no_expiration(self): 1060 """Test a series of `enqueue_offload()` calls with `days_old` of 0. 1061 1062 This tests that offload works as expected if calls are made 1063 only after the job becomes expired. 1064 1065 """ 1066 self._job.set_finished(0) 1067 self._offload_expired_job(0) 1068 1069 1070 def test_case_1_with_expiration(self): 1071 """Test a series of `enqueue_offload()` calls with `days_old` non-zero. 1072 1073 This tests that offload works as expected if calls are made 1074 before the job finishes, before the job expires, and after 1075 the job expires. 1076 1077 """ 1078 self._offload_unexpired_job(_TEST_EXPIRATION_AGE) 1079 self._job.set_finished(_TEST_EXPIRATION_AGE) 1080 self._offload_unexpired_job(_TEST_EXPIRATION_AGE) 1081 self._job.set_expired(_TEST_EXPIRATION_AGE) 1082 self._offload_expired_job(_TEST_EXPIRATION_AGE) 1083 1084 1085 def test_case_2_with_expiration(self): 1086 """Test a series of `enqueue_offload()` calls with `days_old` non-zero. 1087 1088 This tests that offload works as expected if calls are made 1089 between finishing and expiration, and after the job expires. 1090 1091 """ 1092 self._job.set_finished(_TEST_EXPIRATION_AGE) 1093 self._offload_unexpired_job(_TEST_EXPIRATION_AGE) 1094 self._job.set_expired(_TEST_EXPIRATION_AGE) 1095 self._offload_expired_job(_TEST_EXPIRATION_AGE) 1096 1097 1098 def test_case_3_with_expiration(self): 1099 """Test a series of `enqueue_offload()` calls with `days_old` non-zero. 1100 1101 This tests that offload works as expected if calls are made 1102 only before finishing and after expiration. 1103 1104 """ 1105 self._offload_unexpired_job(_TEST_EXPIRATION_AGE) 1106 self._job.set_expired(_TEST_EXPIRATION_AGE) 1107 self._offload_expired_job(_TEST_EXPIRATION_AGE) 1108 1109 1110 def test_case_4_with_expiration(self): 1111 """Test a series of `enqueue_offload()` calls with `days_old` non-zero. 1112 1113 This tests that offload works as expected if calls are made 1114 only after expiration. 1115 1116 """ 1117 self._job.set_expired(_TEST_EXPIRATION_AGE) 1118 self._offload_expired_job(_TEST_EXPIRATION_AGE) 1119 1120 1121class GetJobDirectoriesTests(_TempResultsDirTestBase): 1122 """Tests for `_JobDirectory.get_job_directories()`.""" 1123 1124 def setUp(self): 1125 super(GetJobDirectoriesTests, self).setUp() 1126 self.make_job_hierarchy() 1127 os.mkdir('not-a-job') 1128 open('not-a-dir', 'w').close() 1129 1130 1131 def _run_get_directories(self, cls, expected_list): 1132 """Test `get_job_directories()` for the given class. 1133 1134 Calls the method, and asserts that the returned list of 1135 directories matches the expected return value. 1136 1137 @param expected_list Expected return value from the call. 1138 """ 1139 dirlist = cls.get_job_directories() 1140 self.assertEqual(set(dirlist), set(expected_list)) 1141 1142 1143 def test_get_regular_jobs(self): 1144 """Test `RegularJobDirectory.get_job_directories()`.""" 1145 self._run_get_directories(job_directories.RegularJobDirectory, 1146 self.REGULAR_JOBLIST) 1147 1148 1149 def test_get_special_jobs(self): 1150 """Test `SpecialJobDirectory.get_job_directories()`.""" 1151 self._run_get_directories(job_directories.SpecialJobDirectory, 1152 self.SPECIAL_JOBLIST) 1153 1154 1155class AddJobsTests(_TempResultsDirTestBase): 1156 """Tests for `Offloader._add_new_jobs()`.""" 1157 1158 MOREJOBS = ['115-fubar', '116-fubar', '117-fubar', '118-snafu'] 1159 1160 def setUp(self): 1161 super(AddJobsTests, self).setUp() 1162 self._initial_job_names = ( 1163 set(self.REGULAR_JOBLIST) | set(self.SPECIAL_JOBLIST)) 1164 self.make_job_hierarchy() 1165 self._offloader = gs_offloader.Offloader(_get_options(['-a'])) 1166 1167 logging_patcher = mock.patch.object(logging, 'debug') 1168 self.logging_patch = logging_patcher.start() 1169 self.addCleanup(logging_patcher.stop) 1170 1171 def _run_add_new_jobs(self, expected_key_set): 1172 """Basic test assertions for `_add_new_jobs()`. 1173 1174 Asserts the following: 1175 * The keys in the offloader's `_open_jobs` dictionary 1176 matches the expected set of keys. 1177 * For every job in `_open_jobs`, the job has the expected 1178 directory name. 1179 1180 """ 1181 count = len(expected_key_set) - len(self._offloader._open_jobs) 1182 self._offloader._add_new_jobs() 1183 self.assertEqual(expected_key_set, 1184 set(self._offloader._open_jobs.keys())) 1185 for jobkey, job in self._offloader._open_jobs.items(): 1186 self.assertEqual(jobkey, job.dirname) 1187 1188 self.logging_patch.assert_called_with(mock.ANY, count) 1189 1190 def test_add_jobs_empty(self): 1191 """Test adding jobs to an empty dictionary. 1192 1193 Calls the offloader's `_add_new_jobs()`, then perform 1194 the assertions of `self._check_open_jobs()`. 1195 1196 """ 1197 self._run_add_new_jobs(self._initial_job_names) 1198 1199 1200 def test_add_jobs_non_empty(self): 1201 """Test adding jobs to a non-empty dictionary. 1202 1203 Calls the offloader's `_add_new_jobs()` twice; once from 1204 initial conditions, and then again after adding more 1205 directories. After the second call, perform the assertions 1206 of `self._check_open_jobs()`. Additionally, assert that 1207 keys added by the first call still map to their original 1208 job object after the second call. 1209 1210 """ 1211 self._run_add_new_jobs(self._initial_job_names) 1212 jobs_copy = self._offloader._open_jobs.copy() 1213 for d in self.MOREJOBS: 1214 os.mkdir(d) 1215 self._run_add_new_jobs(self._initial_job_names | 1216 set(self.MOREJOBS)) 1217 for key in jobs_copy.keys(): 1218 self.assertIs(jobs_copy[key], 1219 self._offloader._open_jobs[key]) 1220 1221 1222class ReportingTests(_TempResultsDirTestBase): 1223 """Tests for `Offloader._report_failed_jobs()`.""" 1224 1225 def setUp(self): 1226 super(ReportingTests, self).setUp() 1227 self._offloader = gs_offloader.Offloader(_get_options([])) 1228 1229 failed_jobs_patcher = mock.patch.object(self._offloader, 1230 '_log_failed_jobs_locally') 1231 self.failed_jobs_patch = failed_jobs_patcher.start() 1232 self.addCleanup(failed_jobs_patcher.stop) 1233 1234 logging_patcher = mock.patch.object(logging, 'debug') 1235 self.logging_patch = logging_patcher.start() 1236 self.addCleanup(logging_patcher.stop) 1237 1238 def _add_job(self, jobdir): 1239 """Add a job to the dictionary of unfinished jobs.""" 1240 j = self.make_job(jobdir) 1241 self._offloader._open_jobs[j.dirname] = j 1242 return j 1243 1244 1245 def _expect_log_message(self, new_open_jobs, with_failures, count=None): 1246 """Mock expected logging calls. 1247 1248 `_report_failed_jobs()` logs one message with the number 1249 of jobs removed from the open job set and the number of jobs 1250 still remaining. Additionally, if there are reportable 1251 jobs, then it logs the number of jobs that haven't yet 1252 offloaded. 1253 1254 This sets up the logging calls using `new_open_jobs` to 1255 figure the job counts. If `with_failures` is true, then 1256 the log message is set up assuming that all jobs in 1257 `new_open_jobs` have offload failures. 1258 1259 @param new_open_jobs New job set for calculating counts 1260 in the messages. 1261 @param with_failures Whether the log message with a 1262 failure count is expected. 1263 1264 """ 1265 if not count: 1266 count = len(self._offloader._open_jobs) - len(new_open_jobs) 1267 self.logging_patch.assert_called_with(mock.ANY, count, 1268 len(new_open_jobs)) 1269 if with_failures: 1270 self.logging_patch.assert_called_with(mock.ANY, len(new_open_jobs)) 1271 1272 def _run_update(self, new_open_jobs): 1273 """Call `_report_failed_jobs()`. 1274 1275 Initial conditions are set up by the caller. This calls 1276 `_report_failed_jobs()` once, and then checks these 1277 assertions: 1278 * The offloader's new `_open_jobs` field contains only 1279 the entries in `new_open_jobs`. 1280 1281 @param new_open_jobs A dictionary representing the expected 1282 new value of the offloader's 1283 `_open_jobs` field. 1284 """ 1285 self._offloader._report_failed_jobs() 1286 self._offloader._remove_offloaded_jobs() 1287 self.assertEqual(self._offloader._open_jobs, new_open_jobs) 1288 1289 1290 def _expect_failed_jobs(self, failed_jobs): 1291 """Mock expected call to log the failed jobs on local disk. 1292 1293 TODO(crbug.com/686904): The fact that we have to mock an internal 1294 function for this test is evidence that we need to pull out the local 1295 file formatter in its own object in a future CL. 1296 1297 @param failed_jobs: The list of jobs being logged as failed. 1298 """ 1299 self._offloader._log_failed_jobs_locally(failed_jobs) 1300 1301 1302 def test_no_jobs(self): 1303 """Test `_report_failed_jobs()` with no open jobs. 1304 1305 Initial conditions are an empty `_open_jobs` list. 1306 Expected result is an empty `_open_jobs` list. 1307 1308 """ 1309 self._expect_failed_jobs([]) 1310 self._run_update({}) 1311 self._expect_log_message({}, False) 1312 1313 1314 def test_all_completed(self): 1315 """Test `_report_failed_jobs()` with only complete jobs. 1316 1317 Initial conditions are an `_open_jobs` list consisting of only completed 1318 jobs. 1319 Expected result is an empty `_open_jobs` list. 1320 1321 """ 1322 1323 for d in self.REGULAR_JOBLIST: 1324 self._add_job(d).set_complete() 1325 count = len(self._offloader._open_jobs) 1326 self._expect_failed_jobs([]) 1327 self._run_update({}) 1328 self._expect_log_message({}, False, count) 1329 1330 1331 def test_none_finished(self): 1332 """Test `_report_failed_jobs()` with only unfinished jobs. 1333 1334 Initial conditions are an `_open_jobs` list consisting of only 1335 unfinished jobs. 1336 Expected result is no change to the `_open_jobs` list. 1337 1338 """ 1339 for d in self.REGULAR_JOBLIST: 1340 self._add_job(d) 1341 new_jobs = self._offloader._open_jobs.copy() 1342 self._expect_failed_jobs([]) 1343 self._run_update(new_jobs) 1344 self._expect_log_message(new_jobs, False) 1345 1346 1347class GsOffloaderMockTests(_TempResultsDirTestCase): 1348 """Tests using mock instead of mox.""" 1349 1350 def setUp(self): 1351 super(GsOffloaderMockTests, self).setUp() 1352 alarm = mock.patch('signal.alarm', return_value=0) 1353 alarm.start() 1354 self.addCleanup(alarm.stop) 1355 1356 self._saved_loglevel = logging.getLogger().getEffectiveLevel() 1357 logging.getLogger().setLevel(logging.CRITICAL + 1) 1358 1359 self._job = self.make_job(self.REGULAR_JOBLIST[0]) 1360 1361 1362 def test_offload_timeout_early(self): 1363 """Test that `offload_dir()` times out correctly. 1364 1365 This test triggers timeout at the earliest possible moment, 1366 at the first call to set the timeout alarm. 1367 1368 """ 1369 signal.alarm.side_effect = [0, timeout_util.TimeoutError('fubar')] 1370 gs_offloader.GSOffloader( 1371 utils.DEFAULT_OFFLOAD_GSURI, False, 0).offload( 1372 self._job.queue_args[0], 1373 self._job.queue_args[1], 1374 self._job.queue_args[2]) 1375 self.assertTrue(os.path.isdir(self._job.queue_args[0])) 1376 1377 1378 # TODO(ayatane): This tests passes when run locally, but it fails 1379 # when run on trybot. I have no idea why, but the assert isdir 1380 # fails. 1381 # 1382 # This test is also kind of redundant since we are using the timeout 1383 # from chromite which has its own tests. 1384 @unittest.skip('This fails on trybot') 1385 def test_offload_timeout_late(self): 1386 """Test that `offload_dir()` times out correctly. 1387 1388 This test triggers timeout at the latest possible moment, at 1389 the call to clear the timeout alarm. 1390 1391 """ 1392 signal.alarm.side_effect = [0, 0, timeout_util.TimeoutError('fubar')] 1393 with mock.patch.object(gs_offloader, '_get_cmd_list', 1394 autospec=True) as get_cmd_list: 1395 get_cmd_list.return_value = ['test', '-d', self._job.queue_args[0]] 1396 gs_offloader.GSOffloader( 1397 utils.DEFAULT_OFFLOAD_GSURI, False, 0).offload( 1398 self._job.queue_args[0], 1399 self._job.queue_args[1], 1400 self._job.queue_args[2]) 1401 self.assertTrue(os.path.isdir(self._job.queue_args[0])) 1402 1403 1404 1405if __name__ == '__main__': 1406 unittest.main() 1407