xref: /aosp_15_r20/external/autotest/site_utils/gs_offloader_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/python3
2# Copyright 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import six.moves.builtins
11import six.moves.queue
12import json
13import logging
14import os
15import shutil
16import signal
17import stat
18import subprocess
19import sys
20import tarfile
21import tempfile
22import time
23import unittest
24from unittest import mock
25
26import common
27
28from autotest_lib.client.common_lib import global_config
29from autotest_lib.client.common_lib import utils
30from autotest_lib.client.common_lib.test_utils.comparators import IsA
31
32#For unittest without cloud_client.proto compiled.
33try:
34    from autotest_lib.site_utils import cloud_console_client
35except ImportError:
36    cloud_console_client = None
37from autotest_lib.site_utils import gs_offloader
38from autotest_lib.site_utils import job_directories
39from autotest_lib.site_utils import job_directories_unittest as jd_test
40from autotest_lib.tko import models
41from autotest_lib.utils import gslib
42from autotest_lib.site_utils import pubsub_utils
43from autotest_lib.utils.frozen_chromite.lib import timeout_util
44from six.moves import range
45
46# Test value to use for `days_old`, if nothing else is required.
47_TEST_EXPIRATION_AGE = 7
48
49
50def _get_options(argv):
51    """Helper function to exercise command line parsing.
52
53    @param argv Value of sys.argv to be parsed.
54
55    """
56    sys.argv = ['bogus.py'] + argv
57    return gs_offloader.parse_options()
58
59
60def is_fifo(path):
61    """Determines whether a path is a fifo.
62
63  @param path: fifo path string.
64  """
65    return stat.S_ISFIFO(os.lstat(path).st_mode)
66
67
68def _get_fake_process():
69    return FakeProcess()
70
71
72class FakeProcess(object):
73    """Fake process object."""
74
75    def __init__(self):
76        self.returncode = 0
77
78
79    def wait(self):
80        return True
81
82
83class OffloaderOptionsTests(unittest.TestCase):
84    """Tests for the `Offloader` constructor.
85
86    Tests that offloader instance fields are set as expected
87    for given command line options.
88
89    """
90
91    _REGULAR_ONLY = {job_directories.SwarmingJobDirectory,
92                     job_directories.RegularJobDirectory}
93    _SPECIAL_ONLY = {job_directories.SwarmingJobDirectory,
94                     job_directories.SpecialJobDirectory}
95    _BOTH = _REGULAR_ONLY | _SPECIAL_ONLY
96
97
98    def setUp(self):
99        super(OffloaderOptionsTests, self).setUp()
100        patcher = mock.patch.object(utils, 'get_offload_gsuri')
101        self.gsuri_patch = patcher.start()
102        self.addCleanup(patcher.stop)
103
104        gs_offloader.GS_OFFLOADING_ENABLED = True
105        gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False
106
107
108    def _mock_get_sub_offloader(self, is_moblab, multiprocessing=False,
109                               console_client=None, delete_age=0):
110        """Mock the process of getting the offload_dir function."""
111        if is_moblab:
112            self.expected_gsuri = '%sresults/%s/%s/' % (
113                    global_config.global_config.get_config_value(
114                            'CROS', 'image_storage_server'),
115                    'Fa:ke:ma:c0:12:34', 'rand0m-uu1d')
116        else:
117            self.expected_gsuri = utils.DEFAULT_OFFLOAD_GSURI
118        utils.get_offload_gsuri.return_value = self.expected_gsuri
119        sub_offloader = gs_offloader.GSOffloader(self.expected_gsuri,
120                                                 multiprocessing, delete_age,
121                                                 console_client)
122
123        GsOffloader_patcher = mock.patch.object(gs_offloader, 'GSOffloader')
124        self.GsOffloader_patch = GsOffloader_patcher.start()
125        self.addCleanup(GsOffloader_patcher.stop)
126
127        if cloud_console_client:
128            console_patcher = mock.patch.object(
129                    cloud_console_client, 'is_cloud_notification_enabled')
130            self.ccc_notification_patch = console_patcher.start()
131            self.addCleanup(console_patcher.stop)
132
133        if console_client:
134            cloud_console_client.is_cloud_notification_enabled.return_value = True
135            gs_offloader.GSOffloader.return_value = sub_offloader
136        else:
137            if cloud_console_client:
138                cloud_console_client.is_cloud_notification_enabled.return_value = False
139            gs_offloader.GSOffloader.return_value = sub_offloader
140
141        return sub_offloader
142
143    def _verify_sub_offloader(self,
144                              is_moblab,
145                              multiprocessing=False,
146                              console_client=None,
147                              delete_age=0):
148        if console_client:
149            self.GsOffloader_patch.assert_called_with(
150                    self.expected_gsuri, multiprocessing, delete_age,
151                    IsA(cloud_console_client.PubSubBasedClient))
152
153        else:
154            self.GsOffloader_patch.assert_called_with(self.expected_gsuri,
155                                                      multiprocessing,
156                                                      delete_age, None)
157
158    def test_process_no_options(self):
159        """Test default offloader options."""
160        sub_offloader = self._mock_get_sub_offloader(False)
161        offloader = gs_offloader.Offloader(_get_options([]))
162        self.assertEqual(set(offloader._jobdir_classes),
163                         self._REGULAR_ONLY)
164        self.assertEqual(offloader._processes, 1)
165        self.assertEqual(offloader._gs_offloader,
166                         sub_offloader)
167        self.assertEqual(offloader._upload_age_limit, 0)
168        self.assertEqual(offloader._delete_age_limit, 0)
169        self._verify_sub_offloader(False)
170
171    def test_process_all_option(self):
172        """Test offloader handling for the --all option."""
173        sub_offloader = self._mock_get_sub_offloader(False)
174        offloader = gs_offloader.Offloader(_get_options(['--all']))
175        self.assertEqual(set(offloader._jobdir_classes), self._BOTH)
176        self.assertEqual(offloader._processes, 1)
177        self.assertEqual(offloader._gs_offloader,
178                         sub_offloader)
179        self.assertEqual(offloader._upload_age_limit, 0)
180        self.assertEqual(offloader._delete_age_limit, 0)
181        self._verify_sub_offloader(False)
182
183
184    def test_process_hosts_option(self):
185        """Test offloader handling for the --hosts option."""
186        sub_offloader = self._mock_get_sub_offloader(False)
187        offloader = gs_offloader.Offloader(
188                _get_options(['--hosts']))
189        self.assertEqual(set(offloader._jobdir_classes),
190                         self._SPECIAL_ONLY)
191        self.assertEqual(offloader._processes, 1)
192        self.assertEqual(offloader._gs_offloader,
193                         sub_offloader)
194        self.assertEqual(offloader._upload_age_limit, 0)
195        self.assertEqual(offloader._delete_age_limit, 0)
196        self._verify_sub_offloader(False)
197
198
199    def test_parallelism_option(self):
200        """Test offloader handling for the --parallelism option."""
201        sub_offloader = self._mock_get_sub_offloader(False)
202        offloader = gs_offloader.Offloader(
203                _get_options(['--parallelism', '2']))
204        self.assertEqual(set(offloader._jobdir_classes),
205                         self._REGULAR_ONLY)
206        self.assertEqual(offloader._processes, 2)
207        self.assertEqual(offloader._gs_offloader,
208                         sub_offloader)
209        self.assertEqual(offloader._upload_age_limit, 0)
210        self.assertEqual(offloader._delete_age_limit, 0)
211        self._verify_sub_offloader(False)
212
213
214    def test_delete_only_option(self):
215        """Test offloader handling for the --delete_only option."""
216        offloader = gs_offloader.Offloader(
217                _get_options(['--delete_only']))
218        self.assertEqual(set(offloader._jobdir_classes),
219                         self._REGULAR_ONLY)
220        self.assertEqual(offloader._processes, 1)
221        self.assertIsInstance(offloader._gs_offloader,
222                              gs_offloader.FakeGSOffloader)
223        self.assertEqual(offloader._upload_age_limit, 0)
224        self.assertEqual(offloader._delete_age_limit, 0)
225
226
227    def test_days_old_option(self):
228        """Test offloader handling for the --days_old option."""
229        sub_offloader = self._mock_get_sub_offloader(False, delete_age=7)
230        offloader = gs_offloader.Offloader(
231                _get_options(['--days_old', '7']))
232        self.assertEqual(set(offloader._jobdir_classes),
233                         self._REGULAR_ONLY)
234        self.assertEqual(offloader._processes, 1)
235        self.assertEqual(offloader._gs_offloader,
236                         sub_offloader)
237        self.assertEqual(offloader._upload_age_limit, 7)
238        self.assertEqual(offloader._delete_age_limit, 7)
239        self._verify_sub_offloader(False, delete_age=7)
240
241
242    def test_moblab_gsuri_generation(self):
243        """Test offloader construction for Moblab."""
244        sub_offloader = self._mock_get_sub_offloader(True)
245        offloader = gs_offloader.Offloader(_get_options([]))
246        self.assertEqual(set(offloader._jobdir_classes),
247                         self._REGULAR_ONLY)
248        self.assertEqual(offloader._processes, 1)
249        self.assertEqual(offloader._gs_offloader,
250                         sub_offloader)
251        self.assertEqual(offloader._upload_age_limit, 0)
252        self.assertEqual(offloader._delete_age_limit, 0)
253        self._verify_sub_offloader(True)
254
255
256    def test_globalconfig_offloading_flag(self):
257        """Test enabling of --delete_only via global_config."""
258        gs_offloader.GS_OFFLOADING_ENABLED = False
259        offloader = gs_offloader.Offloader(
260                _get_options([]))
261        self.assertIsInstance(offloader._gs_offloader,
262                             gs_offloader.FakeGSOffloader)
263
264    def test_offloader_multiprocessing_flag_set(self):
265        """Test multiprocessing is set."""
266        sub_offloader = self._mock_get_sub_offloader(True, True)
267        offloader = gs_offloader.Offloader(_get_options(['-m']))
268        self.assertEqual(offloader._gs_offloader,
269                         sub_offloader)
270        self._verify_sub_offloader(True, True)
271
272    def test_offloader_multiprocessing_flag_not_set_default_false(self):
273        """Test multiprocessing is set."""
274        gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False
275        sub_offloader = self._mock_get_sub_offloader(True, False)
276        offloader = gs_offloader.Offloader(_get_options([]))
277        self.assertEqual(offloader._gs_offloader,
278                         sub_offloader)
279        self._verify_sub_offloader(True, False)
280
281    def test_offloader_multiprocessing_flag_not_set_default_true(self):
282        """Test multiprocessing is set."""
283        gs_offloader.GS_OFFLOADER_MULTIPROCESSING = True
284        sub_offloader = self._mock_get_sub_offloader(True, True)
285        offloader = gs_offloader.Offloader(_get_options([]))
286        self.assertEqual(offloader._gs_offloader,
287                         sub_offloader)
288        self._verify_sub_offloader(True, True)
289
290
291    def test_offloader_pubsub_enabled(self):
292        """Test multiprocessing is set."""
293        if not cloud_console_client:
294            return
295        with mock.patch.object(pubsub_utils, "PubSubClient"):
296            sub_offloader = self._mock_get_sub_offloader(
297                    True, False, cloud_console_client.PubSubBasedClient())
298            offloader = gs_offloader.Offloader(_get_options([]))
299            self.assertEqual(offloader._gs_offloader, sub_offloader)
300            self._verify_sub_offloader(
301                    True, False, cloud_console_client.PubSubBasedClient())
302
303
304class _MockJobDirectory(job_directories._JobDirectory):
305    """Subclass of `_JobDirectory` used as a helper for tests."""
306
307    GLOB_PATTERN = '[0-9]*-*'
308
309
310    def __init__(self, resultsdir):
311        """Create new job in initial state."""
312        super(_MockJobDirectory, self).__init__(resultsdir)
313        self._timestamp = None
314        self.queue_args = [resultsdir, os.path.dirname(resultsdir), self._timestamp]
315
316
317    def get_timestamp_if_finished(self):
318        return self._timestamp
319
320
321    def set_finished(self, days_old):
322        """Make this job appear to be finished.
323
324        After calling this function, calls to `enqueue_offload()`
325        will find this job as finished, but not expired and ready
326        for offload.  Note that when `days_old` is 0,
327        `enqueue_offload()` will treat a finished job as eligible
328        for offload.
329
330        @param days_old The value of the `days_old` parameter that
331                        will be passed to `enqueue_offload()` for
332                        testing.
333
334        """
335        self._timestamp = jd_test.make_timestamp(days_old, False)
336        self.queue_args[2] = self._timestamp
337
338
339    def set_expired(self, days_old):
340        """Make this job eligible to be offloaded.
341
342        After calling this function, calls to `offload` will attempt
343        to offload this job.
344
345        @param days_old The value of the `days_old` parameter that
346                        will be passed to `enqueue_offload()` for
347                        testing.
348
349        """
350        self._timestamp = jd_test.make_timestamp(days_old, True)
351        self.queue_args[2] = self._timestamp
352
353
354    def set_incomplete(self):
355        """Make this job appear to have failed offload just once."""
356        self.offload_count += 1
357        self.first_offload_start = time.time()
358        if not os.path.isdir(self.dirname):
359            os.mkdir(self.dirname)
360
361
362    def set_reportable(self):
363        """Make this job be reportable."""
364        self.set_incomplete()
365        self.offload_count += 1
366
367
368    def set_complete(self):
369        """Make this job be completed."""
370        self.offload_count += 1
371        if os.path.isdir(self.dirname):
372            os.rmdir(self.dirname)
373
374
375    def process_gs_instructions(self):
376        """Always still offload the job directory."""
377        return True
378
379
380class CommandListTests(unittest.TestCase):
381    """Tests for `_get_cmd_list()`."""
382
383    def _command_list_assertions(self, job, use_rsync=True, multi=False):
384        """Call `_get_cmd_list()` and check the return value.
385
386        Check the following assertions:
387          * The command name (argv[0]) is 'gsutil'.
388          * '-m' option (argv[1]) is on when the argument, multi, is True.
389          * The arguments contain the 'cp' subcommand.
390          * The next-to-last argument (the source directory) is the
391            job's `queue_args[0]`.
392          * The last argument (the destination URL) is the job's
393            'queue_args[1]'.
394
395        @param job A job with properly calculated arguments to
396                   `_get_cmd_list()`
397        @param use_rsync True when using 'rsync'. False when using 'cp'.
398        @param multi True when using '-m' option for gsutil.
399
400        """
401        test_bucket_uri = 'gs://a-test-bucket'
402
403        gs_offloader.USE_RSYNC_ENABLED = use_rsync
404
405        gs_path = os.path.join(test_bucket_uri, job.queue_args[1])
406
407        command = gs_offloader._get_cmd_list(
408                multi, job.queue_args[0], gs_path)
409
410        self.assertEqual(command[0], 'gsutil')
411        if multi:
412            self.assertEqual(command[1], '-m')
413        self.assertEqual(command[-2], job.queue_args[0])
414
415        if use_rsync:
416            self.assertTrue('rsync' in command)
417            self.assertEqual(command[-1],
418                             os.path.join(test_bucket_uri, job.queue_args[0]))
419        else:
420            self.assertTrue('cp' in command)
421            self.assertEqual(command[-1],
422                             os.path.join(test_bucket_uri, job.queue_args[1]))
423
424        finish_command = gs_offloader._get_finish_cmd_list(gs_path)
425        self.assertEqual(finish_command[0], 'gsutil')
426        self.assertEqual(finish_command[1], 'cp')
427        self.assertEqual(finish_command[2], '/dev/null')
428        self.assertEqual(finish_command[3],
429                         os.path.join(gs_path, '.finished_offload'))
430
431
432    def test__get_cmd_list_regular(self):
433        """Test `_get_cmd_list()` as for a regular job."""
434        job = _MockJobDirectory('118-debug')
435        self._command_list_assertions(job)
436
437
438    def test__get_cmd_list_special(self):
439        """Test `_get_cmd_list()` as for a special job."""
440        job = _MockJobDirectory('hosts/host1/118-reset')
441        self._command_list_assertions(job)
442
443
444    def test_get_cmd_list_regular_no_rsync(self):
445        """Test `_get_cmd_list()` as for a regular job."""
446        job = _MockJobDirectory('118-debug')
447        self._command_list_assertions(job, use_rsync=False)
448
449
450    def test_get_cmd_list_special_no_rsync(self):
451        """Test `_get_cmd_list()` as for a special job."""
452        job = _MockJobDirectory('hosts/host1/118-reset')
453        self._command_list_assertions(job, use_rsync=False)
454
455
456    def test_get_cmd_list_regular_multi(self):
457        """Test `_get_cmd_list()` as for a regular job with True multi."""
458        job = _MockJobDirectory('118-debug')
459        self._command_list_assertions(job, multi=True)
460
461
462    def test__get_cmd_list_special_multi(self):
463        """Test `_get_cmd_list()` as for a special job with True multi."""
464        job = _MockJobDirectory('hosts/host1/118-reset')
465        self._command_list_assertions(job, multi=True)
466
467
468class _TempResultsDirTestCase(unittest.TestCase):
469    """Mixin class for tests using a temporary results directory."""
470
471    REGULAR_JOBLIST = [
472        '111-fubar', '112-fubar', '113-fubar', '114-snafu']
473    HOST_LIST = ['host1', 'host2', 'host3']
474    SPECIAL_JOBLIST = [
475        'hosts/host1/333-reset', 'hosts/host1/334-reset',
476        'hosts/host2/444-reset', 'hosts/host3/555-reset']
477
478
479    def setUp(self):
480        super(_TempResultsDirTestCase, self).setUp()
481        self._resultsroot = tempfile.mkdtemp()
482        self._cwd = os.getcwd()
483        os.chdir(self._resultsroot)
484
485
486    def tearDown(self):
487        os.chdir(self._cwd)
488        shutil.rmtree(self._resultsroot)
489        super(_TempResultsDirTestCase, self).tearDown()
490
491
492    def make_job(self, jobdir):
493        """Create a job with results in `self._resultsroot`.
494
495        @param jobdir Name of the subdirectory to be created in
496                      `self._resultsroot`.
497
498        """
499        os.makedirs(jobdir)
500        return _MockJobDirectory(jobdir)
501
502
503    def make_job_hierarchy(self):
504        """Create a sample hierarchy of job directories.
505
506        `self.REGULAR_JOBLIST` is a list of directories for regular
507        jobs to be created; `self.SPECIAL_JOBLIST` is a list of
508        directories for special jobs to be created.
509
510        """
511        for d in self.REGULAR_JOBLIST:
512            os.mkdir(d)
513        hostsdir = 'hosts'
514        os.mkdir(hostsdir)
515        for host in self.HOST_LIST:
516            os.mkdir(os.path.join(hostsdir, host))
517        for d in self.SPECIAL_JOBLIST:
518            os.mkdir(d)
519
520
521class _TempResultsDirTestBase(_TempResultsDirTestCase, unittest.TestCase):
522    """Base test class for tests using a temporary results directory."""
523
524
525class FailedOffloadsLogTest(_TempResultsDirTestBase):
526    """Test the formatting of failed offloads log file."""
527    # Below is partial sample of a failed offload log file.  This text is
528    # deliberately hard-coded and then parsed to create the test data; the idea
529    # is to make sure the actual text format will be reviewed by a human being.
530    #
531    # first offload      count  directory
532    # --+----1----+----  ----+ ----+----1----+----2----+----3
533    _SAMPLE_DIRECTORIES_REPORT = '''\
534    =================== ======  ==============================
535    2014-03-14 15:09:26      1  118-fubar
536    2014-03-14 15:19:23      2  117-fubar
537    2014-03-14 15:29:20      6  116-fubar
538    2014-03-14 15:39:17     24  115-fubar
539    2014-03-14 15:49:14    120  114-fubar
540    2014-03-14 15:59:11    720  113-fubar
541    2014-03-14 16:09:08   5040  112-fubar
542    2014-03-14 16:19:05  40320  111-fubar
543    '''
544
545    def setUp(self):
546        super(FailedOffloadsLogTest, self).setUp()
547        self._offloader = gs_offloader.Offloader(_get_options([]))
548        self._joblist = []
549        for line in self._SAMPLE_DIRECTORIES_REPORT.split('\n')[1 : -1]:
550            date_, time_, count, dir_ = line.split()
551            job = _MockJobDirectory(dir_)
552            job.offload_count = int(count)
553            timestruct = time.strptime("%s %s" % (date_, time_),
554                                       gs_offloader.FAILED_OFFLOADS_TIME_FORMAT)
555            job.first_offload_start = time.mktime(timestruct)
556            # enter the jobs in reverse order, to make sure we
557            # test that the output will be sorted.
558            self._joblist.insert(0, job)
559
560
561    def assert_report_well_formatted(self, report_file):
562        """Assert that report file is well formatted.
563
564        @param report_file: Path to report file
565        """
566        with open(report_file, 'r') as f:
567            report_lines = f.read().split()
568
569        for end_of_header_index in range(len(report_lines)):
570            if report_lines[end_of_header_index].startswith('=='):
571                break
572        self.assertLess(end_of_header_index, len(report_lines),
573                        'Failed to find end-of-header marker in the report')
574
575        relevant_lines = report_lines[end_of_header_index:]
576        expected_lines = self._SAMPLE_DIRECTORIES_REPORT.split()
577        self.assertListEqual(relevant_lines, expected_lines)
578
579
580    def test_failed_offload_log_format(self):
581        """Trigger an e-mail report and check its contents."""
582        log_file = os.path.join(self._resultsroot, 'failed_log')
583        report = self._offloader._log_failed_jobs_locally(self._joblist,
584                                                          log_file=log_file)
585        self.assert_report_well_formatted(log_file)
586
587
588    def test_failed_offload_file_overwrite(self):
589        """Verify that we can saefly overwrite the log file."""
590        log_file = os.path.join(self._resultsroot, 'failed_log')
591        with open(log_file, 'w') as f:
592            f.write('boohoohoo')
593        report = self._offloader._log_failed_jobs_locally(self._joblist,
594                                                          log_file=log_file)
595        self.assert_report_well_formatted(log_file)
596
597
598class OffloadDirectoryTests(_TempResultsDirTestBase):
599    """Tests for `offload_dir()`."""
600
601    def setUp(self):
602        super(OffloadDirectoryTests, self).setUp()
603        # offload_dir() logs messages; silence them.
604        self._saved_loglevel = logging.getLogger().getEffectiveLevel()
605        logging.getLogger().setLevel(logging.CRITICAL+1)
606        self._job = self.make_job(self.REGULAR_JOBLIST[0])
607
608        cmd_list_patcher = mock.patch.object(gs_offloader, '_get_cmd_list')
609        cmd_list_patch = cmd_list_patcher.start()
610        self.addCleanup(cmd_list_patcher.stop)
611
612        alarm = mock.patch('signal.alarm', return_value=0)
613        alarm.start()
614        self.addCleanup(alarm.stop)
615
616        cmd_list_patcher = mock.patch.object(models.test, 'parse_job_keyval')
617        cmd_list_patch = cmd_list_patcher.start()
618        self.addCleanup(cmd_list_patcher.stop)
619
620        self.should_remove_sarming_req_dir = False
621
622
623    def tearDown(self):
624        logging.getLogger().setLevel(self._saved_loglevel)
625        super(OffloadDirectoryTests, self).tearDown()
626
627    def _mock_create_marker_file(self):
628        open_patcher = mock.patch.object(six.moves.builtins, 'open')
629        open_patch = open_patcher.start()
630        self.addCleanup(open_patcher.stop)
631
632        open.return_value = mock.MagicMock()
633
634
635    def _mock_offload_dir_calls(self, command, queue_args,
636                                marker_initially_exists=False):
637        """Mock out the calls needed by `offload_dir()`.
638
639        This covers only the calls made when there is no timeout.
640
641        @param command Command list to be returned by the mocked
642                       call to `_get_cmd_list()`.
643
644        """
645        isfile_patcher = mock.patch.object(os.path, 'isfile')
646        isfile_patcher.start()
647        self.addCleanup(isfile_patcher.stop)
648
649        os.path.isfile.return_value = marker_initially_exists
650        command.append(queue_args[0])
651        gs_offloader._get_cmd_list(
652                False, queue_args[0],
653                '%s%s' % (utils.DEFAULT_OFFLOAD_GSURI,
654                          queue_args[1])).AndReturn(command)
655
656
657    def _run_offload_dir(self, should_succeed, delete_age):
658        """Make one call to `offload_dir()`.
659
660        The caller ensures all mocks are set up already.
661
662        @param should_succeed True iff the call to `offload_dir()`
663                              is expected to succeed and remove the
664                              offloaded job directory.
665
666        """
667        gs_offloader.GSOffloader(
668                utils.DEFAULT_OFFLOAD_GSURI, False, delete_age).offload(
669                        self._job.queue_args[0],
670                        self._job.queue_args[1],
671                        self._job.queue_args[2])
672        self.assertEqual(not should_succeed,
673                         os.path.isdir(self._job.queue_args[0]))
674        swarming_req_dir = gs_offloader._get_swarming_req_dir(
675                self._job.queue_args[0])
676        if swarming_req_dir:
677            self.assertEqual(not self.should_remove_sarming_req_dir,
678                             os.path.exists(swarming_req_dir))
679
680
681    def test_offload_success(self):
682        """Test that `offload_dir()` can succeed correctly."""
683        self._mock_offload_dir_calls(['test', '-d'],
684                                     self._job.queue_args)
685        os.path.isfile.return_value = True
686        self._mock_create_marker_file()
687        self._run_offload_dir(True, 0)
688
689
690    def test_offload_failure(self):
691        """Test that `offload_dir()` can fail correctly."""
692        self._mock_offload_dir_calls(['test', '!', '-d'],
693                                     self._job.queue_args)
694        self._run_offload_dir(False, 0)
695
696
697    def test_offload_swarming_req_dir_remove(self):
698        """Test that `offload_dir()` can prune the empty swarming task dir."""
699        should_remove = os.path.join('results', 'swarming-123abc0')
700        self._job = self.make_job(os.path.join(should_remove, '1'))
701        self._mock_offload_dir_calls(['test', '-d'],
702                                     self._job.queue_args)
703
704        os.path.isfile.return_value = True
705        self.should_remove_sarming_req_dir = True
706        self._mock_create_marker_file()
707        self._run_offload_dir(True, 0)
708
709
710    def test_offload_swarming_req_dir_exist(self):
711        """Test that `offload_dir()` keeps the non-empty swarming task dir."""
712        should_not_remove = os.path.join('results', 'swarming-456edf0')
713        self._job = self.make_job(os.path.join(should_not_remove, '1'))
714        self.make_job(os.path.join(should_not_remove, '2'))
715        self._mock_offload_dir_calls(['test', '-d'],
716                                     self._job.queue_args)
717
718        os.path.isfile.return_value = True
719        self.should_remove_sarming_req_dir = False
720        self._mock_create_marker_file()
721        self._run_offload_dir(True, 0)
722
723
724    def test_sanitize_dir(self):
725        """Test that folder/file name with invalid character can be corrected.
726        """
727        results_folder = tempfile.mkdtemp()
728        invalid_chars = '_'.join(['[', ']', '*', '?', '#'])
729        invalid_files = []
730        invalid_folder_name = 'invalid_name_folder_%s' % invalid_chars
731        invalid_folder = os.path.join(
732                results_folder,
733                invalid_folder_name)
734        invalid_files.append(os.path.join(
735                invalid_folder,
736                'invalid_name_file_%s' % invalid_chars))
737        good_folder = os.path.join(results_folder, 'valid_name_folder')
738        good_file = os.path.join(good_folder, 'valid_name_file')
739        for folder in [invalid_folder, good_folder]:
740            os.makedirs(folder)
741        for f in invalid_files + [good_file]:
742            with open(f, 'w'):
743                pass
744        # check that broken symlinks don't break sanitization
745        symlink = os.path.join(invalid_folder, 'broken-link')
746        os.symlink(os.path.join(results_folder, 'no-such-file'),
747                   symlink)
748        fifo1 = os.path.join(results_folder, 'test_fifo1')
749        fifo2 = os.path.join(good_folder, 'test_fifo2')
750        fifo3 = os.path.join(invalid_folder, 'test_fifo3')
751        invalid_fifo4_name = 'test_fifo4_%s' % invalid_chars
752        fifo4 = os.path.join(invalid_folder, invalid_fifo4_name)
753        os.mkfifo(fifo1)
754        os.mkfifo(fifo2)
755        os.mkfifo(fifo3)
756        os.mkfifo(fifo4)
757        gs_offloader.sanitize_dir(results_folder)
758        for _, dirs, files in os.walk(results_folder):
759            for name in dirs + files:
760                self.assertEqual(name, gslib.escape(name))
761                for c in name:
762                    self.assertFalse(c in ['[', ']', '*', '?', '#'])
763        self.assertTrue(os.path.exists(good_file))
764
765        self.assertTrue(os.path.exists(fifo1))
766        self.assertFalse(is_fifo(fifo1))
767        self.assertTrue(os.path.exists(fifo2))
768        self.assertFalse(is_fifo(fifo2))
769        corrected_folder = os.path.join(
770                results_folder, gslib.escape(invalid_folder_name))
771        corrected_fifo3 = os.path.join(
772                corrected_folder,
773                'test_fifo3')
774        self.assertFalse(os.path.exists(fifo3))
775        self.assertTrue(os.path.exists(corrected_fifo3))
776        self.assertFalse(is_fifo(corrected_fifo3))
777        corrected_fifo4 = os.path.join(
778                corrected_folder, gslib.escape(invalid_fifo4_name))
779        self.assertFalse(os.path.exists(fifo4))
780        self.assertTrue(os.path.exists(corrected_fifo4))
781        self.assertFalse(is_fifo(corrected_fifo4))
782
783        corrected_symlink = os.path.join(
784                corrected_folder,
785                'broken-link')
786        self.assertFalse(os.path.lexists(symlink))
787        self.assertTrue(os.path.exists(corrected_symlink))
788        self.assertFalse(os.path.islink(corrected_symlink))
789        shutil.rmtree(results_folder)
790
791
792    def check_limit_file_count(self, is_test_job=True):
793        """Test that folder with too many files can be compressed.
794
795        @param is_test_job: True to check the method with test job result
796                            folder. Set to False for special task folder.
797        """
798        results_folder = tempfile.mkdtemp()
799        host_folder = os.path.join(
800                results_folder,
801                'lab1-host1' if is_test_job else 'hosts/lab1-host1/1-repair')
802        debug_folder = os.path.join(host_folder, 'debug')
803        sysinfo_folder = os.path.join(host_folder, 'sysinfo')
804        for folder in [debug_folder, sysinfo_folder]:
805            os.makedirs(folder)
806            for i in range(10):
807                with open(os.path.join(folder, str(i)), 'w') as f:
808                    f.write('test')
809
810        gs_offloader._MAX_FILE_COUNT = 100
811        gs_offloader.limit_file_count(
812                results_folder if is_test_job else host_folder)
813        self.assertTrue(os.path.exists(sysinfo_folder))
814
815        gs_offloader._MAX_FILE_COUNT = 10
816        gs_offloader.limit_file_count(
817                results_folder if is_test_job else host_folder)
818        self.assertFalse(os.path.exists(sysinfo_folder))
819        self.assertTrue(os.path.exists(sysinfo_folder + '.tgz'))
820        self.assertTrue(os.path.exists(debug_folder))
821
822        shutil.rmtree(results_folder)
823
824
825    def test_limit_file_count(self):
826        """Test that folder with too many files can be compressed.
827        """
828        self.check_limit_file_count(is_test_job=True)
829        self.check_limit_file_count(is_test_job=False)
830
831
832    def test_get_metrics_fields(self):
833        """Test method _get_metrics_fields."""
834        results_folder, host_folder = self._create_results_folder()
835        models.test.parse_job_keyval.return_value = ({
836                'build': 'veyron_minnie-cheets-release/R52-8248.0.0',
837                'parent_job_id': 'p_id',
838                'suite': 'arc-cts'
839        })
840        try:
841            self.assertEqual({'board': 'veyron_minnie-cheets',
842                              'milestone': 'R52'},
843                             gs_offloader._get_metrics_fields(host_folder))
844        finally:
845            shutil.rmtree(results_folder)
846
847
848    def _create_results_folder(self):
849        results_folder = tempfile.mkdtemp()
850        host_folder = os.path.join(results_folder, 'chromeos4-row9-rack11-host22')
851
852        # Build host keyvals set to parse model info.
853        host_info_path = os.path.join(host_folder, 'host_keyvals')
854        dir_to_create = '/'
855        for tdir in host_info_path.split('/'):
856            dir_to_create = os.path.join(dir_to_create, tdir)
857            if not os.path.exists(dir_to_create):
858                os.mkdir(dir_to_create)
859        with open(os.path.join(host_info_path, 'chromeos4-row9-rack11-host22'), 'w') as store_file:
860            store_file.write('labels=board%3Acoral,hw_video_acc_vp9,cros,'+
861                             'hw_jpeg_acc_dec,bluetooth,model%3Arobo360,'+
862                             'accel%3Acros-ec,'+
863                             'sku%3Arobo360_IntelR_CeleronR_CPU_N3450_1_10GHz_4Gb')
864
865        # .autoserv_execute file is needed for the test results package to look
866        # legit.
867        autoserve_path = os.path.join(host_folder, '.autoserv_execute')
868        with open(autoserve_path, 'w') as temp_file:
869            temp_file.write(' ')
870
871        return (results_folder, host_folder)
872
873
874class OffladerConfigTests(_TempResultsDirTestBase):
875    """Tests for the `Offloader` to follow side_effect config."""
876
877    def setUp(self):
878        super(OffladerConfigTests, self).setUp()
879        gs_offloader.GS_OFFLOADING_ENABLED = True
880        gs_offloader.GS_OFFLOADER_MULTIPROCESSING = True
881        self.dest_path = '/results'
882
883        metrics_fields_patcher = mock.patch.object(gs_offloader,
884                                                   '_get_metrics_fields')
885        metrics_fields_patcher.start()
886        self.addCleanup(metrics_fields_patcher.stop)
887
888        offloadError_patcher = mock.patch.object(gs_offloader, '_OffloadError')
889        offloadError_patcher.start()
890        self.addCleanup(offloadError_patcher.stop)
891
892        offload_metrics_patcher = mock.patch.object(gs_offloader,
893                                                    '_emit_offload_metrics')
894        offload_metrics_patcher.start()
895        self.addCleanup(offload_metrics_patcher.stop)
896
897        cmd_list_patcher = mock.patch.object(gs_offloader, '_get_cmd_list')
898        cmd_list_patcher.start()
899        self.addCleanup(cmd_list_patcher.stop)
900
901        Popen_patcher = mock.patch.object(subprocess, 'Popen')
902        Popen_patcher.start()
903        self.addCleanup(Popen_patcher.stop)
904
905        returncode_metric_patcher = mock.patch.object(
906                gs_offloader, '_emit_gs_returncode_metric')
907        returncode_metric_patcher.start()
908        self.addCleanup(returncode_metric_patcher.stop)
909
910    def _run(self, results_dir, gs_bucket, expect_dest):
911        stdout = os.path.join(results_dir, 'std.log')
912        stderr = os.path.join(results_dir, 'std.err')
913        config = {
914            'tko': {
915                'proxy_socket': '/file-system/foo-socket',
916                'mysql_user': 'foo-user',
917                'mysql_password_file': '/file-system/foo-password-file'
918            },
919            'google_storage': {
920                'bucket': gs_bucket,
921                'credentials_file': '/foo-creds'
922            },
923            'this_field_is_ignored': True
924        }
925        path = os.path.join(results_dir, 'side_effects_config.json')
926        with open(path, 'w') as f:
927            f.write(json.dumps(config))
928        gs_offloader._get_metrics_fields(results_dir)
929        gs_offloader._get_cmd_list.return_value = ['test', '-d', expect_dest]
930        subprocess.Popen.side_effect = [
931                _get_fake_process(), _get_fake_process()
932        ]
933        gs_offloader._OffloadError(mock.ANY)
934        gs_offloader._emit_gs_returncode_metric.return_value = True
935        gs_offloader._emit_offload_metrics.return_value = True
936        sub_offloader = gs_offloader.GSOffloader(results_dir, True, 0, None)
937        sub_offloader._try_offload(results_dir, self.dest_path, stdout, stderr)
938        shutil.rmtree(results_dir)
939
940    def _verify(self, results_dir, gs_bucket, expect_dest):
941        gs_offloader._get_cmd_list.assert_called_with(True, mock.ANY,
942                                                      expect_dest)
943
944        stdout = os.path.join(results_dir, 'std.log')
945        stderr = os.path.join(results_dir, 'std.err')
946
947        # x2
948        subprocess_call = mock.call(mock.ANY, stdout=stdout, stderr=stderr)
949        subprocess.Popen.assert_has_calls([subprocess_call, subprocess_call])
950
951    def test_skip_gs_prefix(self):
952        """Test skip the 'gs://' prefix if already presented."""
953        res = tempfile.mkdtemp()
954        gs_bucket = 'gs://prod-bucket'
955        expect_dest = gs_bucket + self.dest_path
956        self._run(res, gs_bucket, expect_dest)
957        self._verify(res, gs_bucket, expect_dest)
958
959
960class JobDirectoryOffloadTests(_TempResultsDirTestBase):
961    """Tests for `_JobDirectory.enqueue_offload()`.
962
963    When testing with a `days_old` parameter of 0, we use
964    `set_finished()` instead of `set_expired()`.  This causes the
965    job's timestamp to be set in the future.  This is done so as
966    to test that when `days_old` is 0, the job is always treated
967    as eligible for offload, regardless of the timestamp's value.
968
969    Testing covers the following assertions:
970     A. Each time `enqueue_offload()` is called, a message that
971        includes the job's directory name will be logged using
972        `logging.debug()`, regardless of whether the job was
973        enqueued.  Nothing else is allowed to be logged.
974     B. If the job is not eligible to be offloaded,
975        `first_offload_start` and `offload_count` are 0.
976     C. If the job is not eligible for offload, nothing is
977        enqueued in `queue`.
978     D. When the job is offloaded, `offload_count` increments
979        each time.
980     E. When the job is offloaded, the appropriate parameters are
981        enqueued exactly once.
982     F. The first time a job is offloaded, `first_offload_start` is
983        set to the current time.
984     G. `first_offload_start` only changes the first time that the
985        job is offloaded.
986
987    The test cases below are designed to exercise all of the
988    meaningful state transitions at least once.
989
990    """
991
992    def setUp(self):
993        super(JobDirectoryOffloadTests, self).setUp()
994        self._job = self.make_job(self.REGULAR_JOBLIST[0])
995        self._queue = six.moves.queue.Queue()
996
997
998    def _offload_unexpired_job(self, days_old):
999        """Make calls to `enqueue_offload()` for an unexpired job.
1000
1001        This method tests assertions B and C that calling
1002        `enqueue_offload()` has no effect.
1003
1004        """
1005        self.assertEqual(self._job.offload_count, 0)
1006        self.assertEqual(self._job.first_offload_start, 0)
1007        gs_offloader._enqueue_offload(self._job, self._queue, days_old)
1008        gs_offloader._enqueue_offload(self._job, self._queue, days_old)
1009        self.assertTrue(self._queue.empty())
1010        self.assertEqual(self._job.offload_count, 0)
1011        self.assertEqual(self._job.first_offload_start, 0)
1012
1013
1014    def _offload_expired_once(self, days_old, count):
1015        """Make one call to `enqueue_offload()` for an expired job.
1016
1017        This method tests assertions D and E regarding side-effects
1018        expected when a job is offloaded.
1019
1020        """
1021        gs_offloader._enqueue_offload(self._job, self._queue, days_old)
1022        self.assertEqual(self._job.offload_count, count)
1023        self.assertFalse(self._queue.empty())
1024        v = self._queue.get_nowait()
1025        self.assertTrue(self._queue.empty())
1026        self.assertEqual(v, self._job.queue_args)
1027
1028
1029    def _offload_expired_job(self, days_old):
1030        """Make calls to `enqueue_offload()` for a just-expired job.
1031
1032        This method directly tests assertions F and G regarding
1033        side-effects on `first_offload_start`.
1034
1035        """
1036        t0 = time.time()
1037        self._offload_expired_once(days_old, 1)
1038        t1 = self._job.first_offload_start
1039        self.assertLessEqual(t1, time.time())
1040        self.assertGreaterEqual(t1, t0)
1041        self._offload_expired_once(days_old, 2)
1042        self.assertEqual(self._job.first_offload_start, t1)
1043        self._offload_expired_once(days_old, 3)
1044        self.assertEqual(self._job.first_offload_start, t1)
1045
1046
1047    def test_case_1_no_expiration(self):
1048        """Test a series of `enqueue_offload()` calls with `days_old` of 0.
1049
1050        This tests that offload works as expected if calls are
1051        made both before and after the job becomes expired.
1052
1053        """
1054        self._offload_unexpired_job(0)
1055        self._job.set_finished(0)
1056        self._offload_expired_job(0)
1057
1058
1059    def test_case_2_no_expiration(self):
1060        """Test a series of `enqueue_offload()` calls with `days_old` of 0.
1061
1062        This tests that offload works as expected if calls are made
1063        only after the job becomes expired.
1064
1065        """
1066        self._job.set_finished(0)
1067        self._offload_expired_job(0)
1068
1069
1070    def test_case_1_with_expiration(self):
1071        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
1072
1073        This tests that offload works as expected if calls are made
1074        before the job finishes, before the job expires, and after
1075        the job expires.
1076
1077        """
1078        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
1079        self._job.set_finished(_TEST_EXPIRATION_AGE)
1080        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
1081        self._job.set_expired(_TEST_EXPIRATION_AGE)
1082        self._offload_expired_job(_TEST_EXPIRATION_AGE)
1083
1084
1085    def test_case_2_with_expiration(self):
1086        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
1087
1088        This tests that offload works as expected if calls are made
1089        between finishing and expiration, and after the job expires.
1090
1091        """
1092        self._job.set_finished(_TEST_EXPIRATION_AGE)
1093        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
1094        self._job.set_expired(_TEST_EXPIRATION_AGE)
1095        self._offload_expired_job(_TEST_EXPIRATION_AGE)
1096
1097
1098    def test_case_3_with_expiration(self):
1099        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
1100
1101        This tests that offload works as expected if calls are made
1102        only before finishing and after expiration.
1103
1104        """
1105        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
1106        self._job.set_expired(_TEST_EXPIRATION_AGE)
1107        self._offload_expired_job(_TEST_EXPIRATION_AGE)
1108
1109
1110    def test_case_4_with_expiration(self):
1111        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
1112
1113        This tests that offload works as expected if calls are made
1114        only after expiration.
1115
1116        """
1117        self._job.set_expired(_TEST_EXPIRATION_AGE)
1118        self._offload_expired_job(_TEST_EXPIRATION_AGE)
1119
1120
1121class GetJobDirectoriesTests(_TempResultsDirTestBase):
1122    """Tests for `_JobDirectory.get_job_directories()`."""
1123
1124    def setUp(self):
1125        super(GetJobDirectoriesTests, self).setUp()
1126        self.make_job_hierarchy()
1127        os.mkdir('not-a-job')
1128        open('not-a-dir', 'w').close()
1129
1130
1131    def _run_get_directories(self, cls, expected_list):
1132        """Test `get_job_directories()` for the given class.
1133
1134        Calls the method, and asserts that the returned list of
1135        directories matches the expected return value.
1136
1137        @param expected_list Expected return value from the call.
1138        """
1139        dirlist = cls.get_job_directories()
1140        self.assertEqual(set(dirlist), set(expected_list))
1141
1142
1143    def test_get_regular_jobs(self):
1144        """Test `RegularJobDirectory.get_job_directories()`."""
1145        self._run_get_directories(job_directories.RegularJobDirectory,
1146                                  self.REGULAR_JOBLIST)
1147
1148
1149    def test_get_special_jobs(self):
1150        """Test `SpecialJobDirectory.get_job_directories()`."""
1151        self._run_get_directories(job_directories.SpecialJobDirectory,
1152                                  self.SPECIAL_JOBLIST)
1153
1154
1155class AddJobsTests(_TempResultsDirTestBase):
1156    """Tests for `Offloader._add_new_jobs()`."""
1157
1158    MOREJOBS = ['115-fubar', '116-fubar', '117-fubar', '118-snafu']
1159
1160    def setUp(self):
1161        super(AddJobsTests, self).setUp()
1162        self._initial_job_names = (
1163            set(self.REGULAR_JOBLIST) | set(self.SPECIAL_JOBLIST))
1164        self.make_job_hierarchy()
1165        self._offloader = gs_offloader.Offloader(_get_options(['-a']))
1166
1167        logging_patcher = mock.patch.object(logging, 'debug')
1168        self.logging_patch = logging_patcher.start()
1169        self.addCleanup(logging_patcher.stop)
1170
1171    def _run_add_new_jobs(self, expected_key_set):
1172        """Basic test assertions for `_add_new_jobs()`.
1173
1174        Asserts the following:
1175          * The keys in the offloader's `_open_jobs` dictionary
1176            matches the expected set of keys.
1177          * For every job in `_open_jobs`, the job has the expected
1178            directory name.
1179
1180        """
1181        count = len(expected_key_set) - len(self._offloader._open_jobs)
1182        self._offloader._add_new_jobs()
1183        self.assertEqual(expected_key_set,
1184                         set(self._offloader._open_jobs.keys()))
1185        for jobkey, job in self._offloader._open_jobs.items():
1186            self.assertEqual(jobkey, job.dirname)
1187
1188        self.logging_patch.assert_called_with(mock.ANY, count)
1189
1190    def test_add_jobs_empty(self):
1191        """Test adding jobs to an empty dictionary.
1192
1193        Calls the offloader's `_add_new_jobs()`, then perform
1194        the assertions of `self._check_open_jobs()`.
1195
1196        """
1197        self._run_add_new_jobs(self._initial_job_names)
1198
1199
1200    def test_add_jobs_non_empty(self):
1201        """Test adding jobs to a non-empty dictionary.
1202
1203        Calls the offloader's `_add_new_jobs()` twice; once from
1204        initial conditions, and then again after adding more
1205        directories.  After the second call, perform the assertions
1206        of `self._check_open_jobs()`.  Additionally, assert that
1207        keys added by the first call still map to their original
1208        job object after the second call.
1209
1210        """
1211        self._run_add_new_jobs(self._initial_job_names)
1212        jobs_copy = self._offloader._open_jobs.copy()
1213        for d in self.MOREJOBS:
1214            os.mkdir(d)
1215        self._run_add_new_jobs(self._initial_job_names |
1216                                 set(self.MOREJOBS))
1217        for key in jobs_copy.keys():
1218            self.assertIs(jobs_copy[key],
1219                          self._offloader._open_jobs[key])
1220
1221
1222class ReportingTests(_TempResultsDirTestBase):
1223    """Tests for `Offloader._report_failed_jobs()`."""
1224
1225    def setUp(self):
1226        super(ReportingTests, self).setUp()
1227        self._offloader = gs_offloader.Offloader(_get_options([]))
1228
1229        failed_jobs_patcher = mock.patch.object(self._offloader,
1230                                                '_log_failed_jobs_locally')
1231        self.failed_jobs_patch = failed_jobs_patcher.start()
1232        self.addCleanup(failed_jobs_patcher.stop)
1233
1234        logging_patcher = mock.patch.object(logging, 'debug')
1235        self.logging_patch = logging_patcher.start()
1236        self.addCleanup(logging_patcher.stop)
1237
1238    def _add_job(self, jobdir):
1239        """Add a job to the dictionary of unfinished jobs."""
1240        j = self.make_job(jobdir)
1241        self._offloader._open_jobs[j.dirname] = j
1242        return j
1243
1244
1245    def _expect_log_message(self, new_open_jobs, with_failures, count=None):
1246        """Mock expected logging calls.
1247
1248        `_report_failed_jobs()` logs one message with the number
1249        of jobs removed from the open job set and the number of jobs
1250        still remaining.  Additionally, if there are reportable
1251        jobs, then it logs the number of jobs that haven't yet
1252        offloaded.
1253
1254        This sets up the logging calls using `new_open_jobs` to
1255        figure the job counts.  If `with_failures` is true, then
1256        the log message is set up assuming that all jobs in
1257        `new_open_jobs` have offload failures.
1258
1259        @param new_open_jobs New job set for calculating counts
1260                             in the messages.
1261        @param with_failures Whether the log message with a
1262                             failure count is expected.
1263
1264        """
1265        if not count:
1266            count = len(self._offloader._open_jobs) - len(new_open_jobs)
1267        self.logging_patch.assert_called_with(mock.ANY, count,
1268                                              len(new_open_jobs))
1269        if with_failures:
1270            self.logging_patch.assert_called_with(mock.ANY, len(new_open_jobs))
1271
1272    def _run_update(self, new_open_jobs):
1273        """Call `_report_failed_jobs()`.
1274
1275        Initial conditions are set up by the caller.  This calls
1276        `_report_failed_jobs()` once, and then checks these
1277        assertions:
1278          * The offloader's new `_open_jobs` field contains only
1279            the entries in `new_open_jobs`.
1280
1281        @param new_open_jobs A dictionary representing the expected
1282                             new value of the offloader's
1283                             `_open_jobs` field.
1284        """
1285        self._offloader._report_failed_jobs()
1286        self._offloader._remove_offloaded_jobs()
1287        self.assertEqual(self._offloader._open_jobs, new_open_jobs)
1288
1289
1290    def _expect_failed_jobs(self, failed_jobs):
1291        """Mock expected call to log the failed jobs on local disk.
1292
1293        TODO(crbug.com/686904): The fact that we have to mock an internal
1294        function for this test is evidence that we need to pull out the local
1295        file formatter in its own object in a future CL.
1296
1297        @param failed_jobs: The list of jobs being logged as failed.
1298        """
1299        self._offloader._log_failed_jobs_locally(failed_jobs)
1300
1301
1302    def test_no_jobs(self):
1303        """Test `_report_failed_jobs()` with no open jobs.
1304
1305        Initial conditions are an empty `_open_jobs` list.
1306        Expected result is an empty `_open_jobs` list.
1307
1308        """
1309        self._expect_failed_jobs([])
1310        self._run_update({})
1311        self._expect_log_message({}, False)
1312
1313
1314    def test_all_completed(self):
1315        """Test `_report_failed_jobs()` with only complete jobs.
1316
1317        Initial conditions are an `_open_jobs` list consisting of only completed
1318        jobs.
1319        Expected result is an empty `_open_jobs` list.
1320
1321        """
1322
1323        for d in self.REGULAR_JOBLIST:
1324            self._add_job(d).set_complete()
1325        count = len(self._offloader._open_jobs)
1326        self._expect_failed_jobs([])
1327        self._run_update({})
1328        self._expect_log_message({}, False, count)
1329
1330
1331    def test_none_finished(self):
1332        """Test `_report_failed_jobs()` with only unfinished jobs.
1333
1334        Initial conditions are an `_open_jobs` list consisting of only
1335        unfinished jobs.
1336        Expected result is no change to the `_open_jobs` list.
1337
1338        """
1339        for d in self.REGULAR_JOBLIST:
1340            self._add_job(d)
1341        new_jobs = self._offloader._open_jobs.copy()
1342        self._expect_failed_jobs([])
1343        self._run_update(new_jobs)
1344        self._expect_log_message(new_jobs, False)
1345
1346
1347class GsOffloaderMockTests(_TempResultsDirTestCase):
1348    """Tests using mock instead of mox."""
1349
1350    def setUp(self):
1351        super(GsOffloaderMockTests, self).setUp()
1352        alarm = mock.patch('signal.alarm', return_value=0)
1353        alarm.start()
1354        self.addCleanup(alarm.stop)
1355
1356        self._saved_loglevel = logging.getLogger().getEffectiveLevel()
1357        logging.getLogger().setLevel(logging.CRITICAL + 1)
1358
1359        self._job = self.make_job(self.REGULAR_JOBLIST[0])
1360
1361
1362    def test_offload_timeout_early(self):
1363        """Test that `offload_dir()` times out correctly.
1364
1365        This test triggers timeout at the earliest possible moment,
1366        at the first call to set the timeout alarm.
1367
1368        """
1369        signal.alarm.side_effect = [0, timeout_util.TimeoutError('fubar')]
1370        gs_offloader.GSOffloader(
1371                utils.DEFAULT_OFFLOAD_GSURI, False, 0).offload(
1372                        self._job.queue_args[0],
1373                        self._job.queue_args[1],
1374                        self._job.queue_args[2])
1375        self.assertTrue(os.path.isdir(self._job.queue_args[0]))
1376
1377
1378    # TODO(ayatane): This tests passes when run locally, but it fails
1379    # when run on trybot.  I have no idea why, but the assert isdir
1380    # fails.
1381    #
1382    # This test is also kind of redundant since we are using the timeout
1383    # from chromite which has its own tests.
1384    @unittest.skip('This fails on trybot')
1385    def test_offload_timeout_late(self):
1386        """Test that `offload_dir()` times out correctly.
1387
1388        This test triggers timeout at the latest possible moment, at
1389        the call to clear the timeout alarm.
1390
1391        """
1392        signal.alarm.side_effect = [0, 0, timeout_util.TimeoutError('fubar')]
1393        with mock.patch.object(gs_offloader, '_get_cmd_list',
1394                               autospec=True) as get_cmd_list:
1395            get_cmd_list.return_value = ['test', '-d', self._job.queue_args[0]]
1396            gs_offloader.GSOffloader(
1397                    utils.DEFAULT_OFFLOAD_GSURI, False, 0).offload(
1398                            self._job.queue_args[0],
1399                            self._job.queue_args[1],
1400                            self._job.queue_args[2])
1401            self.assertTrue(os.path.isdir(self._job.queue_args[0]))
1402
1403
1404
1405if __name__ == '__main__':
1406    unittest.main()
1407