1#!/usr/bin/env python3 2# 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7# pylint: disable-msg=C0111 8 9"""Unit tests for server/cros/dynamic_suite/job_status.py.""" 10 11from __future__ import absolute_import 12from __future__ import division 13from __future__ import print_function 14 15import os 16import shutil 17from six.moves import map 18from six.moves import range 19import tempfile 20import unittest 21from unittest import mock 22from unittest.mock import patch 23 24import common 25 26from autotest_lib.server import frontend 27from autotest_lib.server.cros.dynamic_suite import job_status 28from autotest_lib.server.cros.dynamic_suite.fakes import FakeJob 29from autotest_lib.server.cros.dynamic_suite.fakes import FakeStatus 30 31 32DEFAULT_WAITTIMEOUT_MINS = 60 * 4 33 34 35class StatusTest(unittest.TestCase): 36 """Unit tests for job_status.Status. 37 """ 38 39 40 def setUp(self): 41 super(StatusTest, self).setUp() 42 afe_patcher = patch.object(frontend, 'AFE') 43 self.afe = afe_patcher.start() 44 self.addCleanup(afe_patcher.stop) 45 tko_patcher = patch.object(frontend, 'TKO') 46 self.tko = tko_patcher.start() 47 self.addCleanup(tko_patcher.stop) 48 self.tmpdir = tempfile.mkdtemp(suffix=type(self).__name__) 49 # These are called a few times, so we need to return via side_effect. 50 # for some reason side_effect doesn't like appending, so just keeping 51 # a list to then be added at once. 52 self.tko.get_job_test_statuses_from_db.side_effect = [] 53 self.afe.run.side_effect = [] 54 self.run_list = [] 55 self.run_call_list = [] 56 self.job_statuses = [] 57 self.job_statuses_call_list = [] 58 59 def tearDown(self): 60 super(StatusTest, self).tearDown() 61 shutil.rmtree(self.tmpdir, ignore_errors=True) 62 63 def expect_yield_job_entries(self, job): 64 entries = [s.entry for s in job.statuses] 65 self.run_list.append(entries) 66 self.run_call_list.append( 67 mock.call('get_host_queue_entries', job=job.id)) 68 69 if True not in ['aborted' in e and e['aborted'] for e in entries]: 70 self.job_statuses.append(job.statuses) 71 self.job_statuses_call_list.append(mock.call(job.id)) 72 73 @patch('autotest_lib.server.cros.dynamic_suite.job_status.JobResultWaiter._sleep' 74 ) 75 def testJobResultWaiter(self, mock_sleep): 76 """Should gather status and return records for job summaries.""" 77 jobs = [FakeJob(0, [FakeStatus('GOOD', 'T0', ''), 78 FakeStatus('GOOD', 'T1', '')]), 79 FakeJob(1, [FakeStatus('ERROR', 'T0', 'err', False), 80 FakeStatus('GOOD', 'T1', '')]), 81 FakeJob(2, [FakeStatus('TEST_NA', 'T0', 'no')]), 82 FakeJob(3, [FakeStatus('FAIL', 'T0', 'broken')]), 83 FakeJob(4, [FakeStatus('ERROR', 'SERVER_JOB', 'server error'), 84 FakeStatus('GOOD', 'T0', '')]),] 85 # TODO: Write a better test for the case where we yield 86 # results for aborts vs cannot yield results because of 87 # a premature abort. Currently almost all client aborts 88 # have been converted to failures, and when aborts do happen 89 # they result in server job failures for which we always 90 # want results. 91 # FakeJob(5, [FakeStatus('ERROR', 'T0', 'gah', True)]), 92 # The next job shouldn't be recorded in the results. 93 # FakeJob(6, [FakeStatus('GOOD', 'SERVER_JOB', '')])] 94 for status in jobs[4].statuses: 95 status.entry['job'] = {'name': 'broken_infra_job'} 96 97 job_id_set = set([job.id for job in jobs]) 98 yield_values = [ 99 [jobs[1]], 100 [jobs[0], jobs[2]], 101 jobs[3:6] 102 ] 103 104 yield_list = [] 105 called_list = [] 106 107 for yield_this in yield_values: 108 yield_list.append(yield_this) 109 110 # Expected list of calls... 111 called_list.append( 112 mock.call(id__in=list(job_id_set), finished=True)) 113 for job in yield_this: 114 self.expect_yield_job_entries(job) 115 job_id_set.remove(job.id) 116 self.afe.get_jobs.side_effect = yield_list 117 self.afe.run.side_effect = self.run_list 118 self.tko.get_job_test_statuses_from_db.side_effect = self.job_statuses 119 120 waiter = job_status.JobResultWaiter(self.afe, self.tko) 121 waiter.add_jobs(jobs) 122 results = [result for result in waiter.wait_for_results()] 123 for job in jobs[:6]: # the 'GOOD' SERVER_JOB shouldn't be there. 124 for status in job.statuses: 125 self.assertTrue(True in list(map(status.equals_record, results))) 126 127 self.afe.get_jobs.assert_has_calls(called_list) 128 self.afe.run.assert_has_calls(self.run_call_list) 129 self.tko.get_job_test_statuses_from_db.assert_has_calls( 130 self.job_statuses_call_list) 131 132 def testYieldSubdir(self): 133 """Make sure subdir are properly set for test and non-test status.""" 134 job_tag = '0-owner/172.33.44.55' 135 job_name = 'broken_infra_job' 136 job = FakeJob(0, [FakeStatus('ERROR', 'SERVER_JOB', 'server error', 137 subdir='---', job_tag=job_tag), 138 FakeStatus('GOOD', 'T0', '', 139 subdir='T0.subdir', job_tag=job_tag)], 140 parent_job_id=54321) 141 for status in job.statuses: 142 status.entry['job'] = {'name': job_name} 143 144 self.expect_yield_job_entries(job) 145 self.afe.run.side_effect = self.run_list 146 self.tko.get_job_test_statuses_from_db.side_effect = self.job_statuses 147 results = list(job_status._yield_job_results(self.afe, self.tko, job)) 148 for i in range(len(results)): 149 result = results[i] 150 if result.test_name.endswith('SERVER_JOB'): 151 expected_name = '%s_%s' % (job_name, job.statuses[i].test_name) 152 expected_subdir = job_tag 153 else: 154 expected_name = job.statuses[i].test_name 155 expected_subdir = os.path.join(job_tag, job.statuses[i].subdir) 156 self.assertEqual(results[i].test_name, expected_name) 157 self.assertEqual(results[i].subdir, expected_subdir) 158 159 self.afe.run.assert_has_calls(self.run_call_list) 160 self.tko.get_job_test_statuses_from_db.assert_has_calls( 161 self.job_statuses_call_list) 162 163 164if __name__ == '__main__': 165 unittest.main() 166