xref: /aosp_15_r20/external/autotest/server/cros/dynamic_suite/job_status_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/env python3
2#
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7# pylint: disable-msg=C0111
8
9"""Unit tests for server/cros/dynamic_suite/job_status.py."""
10
11from __future__ import absolute_import
12from __future__ import division
13from __future__ import print_function
14
15import os
16import shutil
17from six.moves import map
18from six.moves import range
19import tempfile
20import unittest
21from unittest import mock
22from unittest.mock import patch
23
24import common
25
26from autotest_lib.server import frontend
27from autotest_lib.server.cros.dynamic_suite import job_status
28from autotest_lib.server.cros.dynamic_suite.fakes import FakeJob
29from autotest_lib.server.cros.dynamic_suite.fakes import FakeStatus
30
31
32DEFAULT_WAITTIMEOUT_MINS = 60 * 4
33
34
35class StatusTest(unittest.TestCase):
36    """Unit tests for job_status.Status.
37    """
38
39
40    def setUp(self):
41        super(StatusTest, self).setUp()
42        afe_patcher = patch.object(frontend, 'AFE')
43        self.afe = afe_patcher.start()
44        self.addCleanup(afe_patcher.stop)
45        tko_patcher = patch.object(frontend, 'TKO')
46        self.tko = tko_patcher.start()
47        self.addCleanup(tko_patcher.stop)
48        self.tmpdir = tempfile.mkdtemp(suffix=type(self).__name__)
49        # These are called a few times, so we need to return via side_effect.
50        # for some reason side_effect doesn't like appending, so just keeping
51        # a list to then be added at once.
52        self.tko.get_job_test_statuses_from_db.side_effect = []
53        self.afe.run.side_effect = []
54        self.run_list = []
55        self.run_call_list = []
56        self.job_statuses = []
57        self.job_statuses_call_list = []
58
59    def tearDown(self):
60        super(StatusTest, self).tearDown()
61        shutil.rmtree(self.tmpdir, ignore_errors=True)
62
63    def expect_yield_job_entries(self, job):
64        entries = [s.entry for s in job.statuses]
65        self.run_list.append(entries)
66        self.run_call_list.append(
67                mock.call('get_host_queue_entries', job=job.id))
68
69        if True not in ['aborted' in e and e['aborted'] for e in entries]:
70            self.job_statuses.append(job.statuses)
71            self.job_statuses_call_list.append(mock.call(job.id))
72
73    @patch('autotest_lib.server.cros.dynamic_suite.job_status.JobResultWaiter._sleep'
74           )
75    def testJobResultWaiter(self, mock_sleep):
76        """Should gather status and return records for job summaries."""
77        jobs = [FakeJob(0, [FakeStatus('GOOD', 'T0', ''),
78                            FakeStatus('GOOD', 'T1', '')]),
79                FakeJob(1, [FakeStatus('ERROR', 'T0', 'err', False),
80                            FakeStatus('GOOD', 'T1', '')]),
81                FakeJob(2, [FakeStatus('TEST_NA', 'T0', 'no')]),
82                FakeJob(3, [FakeStatus('FAIL', 'T0', 'broken')]),
83                FakeJob(4, [FakeStatus('ERROR', 'SERVER_JOB', 'server error'),
84                            FakeStatus('GOOD', 'T0', '')]),]
85        # TODO: Write a better test for the case where we yield
86        # results for aborts vs cannot yield results because of
87        # a premature abort. Currently almost all client aborts
88        # have been converted to failures, and when aborts do happen
89        # they result in server job failures for which we always
90        # want results.
91        # FakeJob(5, [FakeStatus('ERROR', 'T0', 'gah', True)]),
92        # The next job shouldn't be recorded in the results.
93        # FakeJob(6, [FakeStatus('GOOD', 'SERVER_JOB', '')])]
94        for status in jobs[4].statuses:
95            status.entry['job'] = {'name': 'broken_infra_job'}
96
97        job_id_set = set([job.id for job in jobs])
98        yield_values = [
99                [jobs[1]],
100                [jobs[0], jobs[2]],
101                jobs[3:6]
102            ]
103
104        yield_list = []
105        called_list = []
106
107        for yield_this in yield_values:
108            yield_list.append(yield_this)
109
110            # Expected list of calls...
111            called_list.append(
112                    mock.call(id__in=list(job_id_set), finished=True))
113            for job in yield_this:
114                self.expect_yield_job_entries(job)
115                job_id_set.remove(job.id)
116        self.afe.get_jobs.side_effect = yield_list
117        self.afe.run.side_effect = self.run_list
118        self.tko.get_job_test_statuses_from_db.side_effect = self.job_statuses
119
120        waiter = job_status.JobResultWaiter(self.afe, self.tko)
121        waiter.add_jobs(jobs)
122        results = [result for result in waiter.wait_for_results()]
123        for job in jobs[:6]:  # the 'GOOD' SERVER_JOB shouldn't be there.
124            for status in job.statuses:
125                self.assertTrue(True in list(map(status.equals_record, results)))
126
127        self.afe.get_jobs.assert_has_calls(called_list)
128        self.afe.run.assert_has_calls(self.run_call_list)
129        self.tko.get_job_test_statuses_from_db.assert_has_calls(
130                self.job_statuses_call_list)
131
132    def testYieldSubdir(self):
133        """Make sure subdir are properly set for test and non-test status."""
134        job_tag = '0-owner/172.33.44.55'
135        job_name = 'broken_infra_job'
136        job = FakeJob(0, [FakeStatus('ERROR', 'SERVER_JOB', 'server error',
137                                     subdir='---', job_tag=job_tag),
138                          FakeStatus('GOOD', 'T0', '',
139                                     subdir='T0.subdir', job_tag=job_tag)],
140                      parent_job_id=54321)
141        for status in job.statuses:
142            status.entry['job'] = {'name': job_name}
143
144        self.expect_yield_job_entries(job)
145        self.afe.run.side_effect = self.run_list
146        self.tko.get_job_test_statuses_from_db.side_effect = self.job_statuses
147        results = list(job_status._yield_job_results(self.afe, self.tko, job))
148        for i in range(len(results)):
149            result = results[i]
150            if result.test_name.endswith('SERVER_JOB'):
151                expected_name = '%s_%s' % (job_name, job.statuses[i].test_name)
152                expected_subdir = job_tag
153            else:
154                expected_name = job.statuses[i].test_name
155                expected_subdir = os.path.join(job_tag, job.statuses[i].subdir)
156            self.assertEqual(results[i].test_name, expected_name)
157            self.assertEqual(results[i].subdir, expected_subdir)
158
159        self.afe.run.assert_has_calls(self.run_call_list)
160        self.tko.get_job_test_statuses_from_db.assert_has_calls(
161                self.job_statuses_call_list)
162
163
164if __name__ == '__main__':
165    unittest.main()
166