xref: /aosp_15_r20/external/autotest/client/bin/job_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/python3
2# pylint: disable=missing-docstring
3
4import logging
5import os
6import re
7import shutil
8import sys
9import tempfile
10import unittest
11
12from six.moves import range
13import six
14
15import common
16from autotest_lib.client.bin import job, sysinfo, harness
17from autotest_lib.client.bin import utils
18from autotest_lib.client.common_lib import error
19from autotest_lib.client.common_lib import logging_manager, logging_config
20from autotest_lib.client.common_lib import base_job_unittest
21from autotest_lib.client.common_lib.test_utils import mock
22
23
24class job_test_case(unittest.TestCase):
25    """Generic job TestCase class that defines a standard job setUp and
26    tearDown, with some standard stubs."""
27
28    job_class = job.base_client_job
29
30    def setUp(self):
31        self.god = mock.mock_god(ut=self)
32        self.god.stub_with(job.base_client_job, '_get_environ_autodir',
33                           classmethod(lambda cls: '/adir'))
34        self.job = self.job_class.__new__(self.job_class)
35        self.job._job_directory = base_job_unittest.stub_job_directory
36
37        _, self.control_file = tempfile.mkstemp()
38
39
40    def tearDown(self):
41        self.god.unstub_all()
42        os.remove(self.control_file)
43
44
45class test_find_base_directories(
46        base_job_unittest.test_find_base_directories.generic_tests,
47        job_test_case):
48
49    def test_autodir_equals_clientdir(self):
50        autodir, clientdir, _ = self.job._find_base_directories()
51        self.assertEqual(autodir, '/adir')
52        self.assertEqual(clientdir, '/adir')
53
54
55    def test_serverdir_is_none(self):
56        _, _, serverdir = self.job._find_base_directories()
57        self.assertEqual(serverdir, None)
58
59
60class abstract_test_init(base_job_unittest.test_init.generic_tests):
61    """Generic client job mixin used when defining variations on the
62    job.__init__ generic tests."""
63
64    PUBLIC_ATTRIBUTES = (
65            base_job_unittest.test_init.generic_tests.PUBLIC_ATTRIBUTES -
66            set(['force_full_log_collection']))
67
68    OPTIONAL_ATTRIBUTES = (
69            base_job_unittest.test_init.generic_tests.OPTIONAL_ATTRIBUTES -
70            set(['control', 'harness', 'force_full_log_collection']))
71
72
73class test_init_minimal_options(abstract_test_init, job_test_case):
74
75    def call_init(self):
76        # TODO(jadmanski): refactor more of the __init__ code to not need to
77        # stub out countless random APIs
78        self.god.stub_function_to_return(job.os, 'mkdir', None)
79        self.god.stub_function_to_return(job.os.path, 'exists', True)
80        self.god.stub_function_to_return(self.job, '_load_state', None)
81        self.god.stub_function_to_return(self.job, 'record', None)
82        self.god.stub_function_to_return(job.shutil, 'copyfile', None)
83        self.god.stub_function_to_return(job.logging_manager,
84                                         'configure_logging', None)
85        class manager:
86            def start_logging(self):
87                return None
88        self.god.stub_function_to_return(job.logging_manager,
89                                         'get_logging_manager', manager())
90        class stub_sysinfo:
91            def log_per_reboot_data(self):
92                return None
93        self.god.stub_function_to_return(job.sysinfo, 'sysinfo',
94                                         stub_sysinfo())
95        class stub_harness:
96            run_start = lambda self: None
97        self.god.stub_function_to_return(job.harness, 'select', stub_harness())
98        class options:
99            tag = ''
100            verbose = False
101            cont = False
102            harness = 'stub'
103            harness_args = None
104            hostname = None
105            user = None
106            log = False
107            args = ''
108            output_dir = ''
109
110        self.god.stub_function_to_return(job.utils, 'drop_caches', None)
111
112        self.job._job_state = base_job_unittest.stub_job_state
113        self.job.__init__(self.control_file, options)
114
115
116class placeholder(object):
117    """A simple placeholder for attributes"""
118    pass
119
120
121class first_line_comparator(mock.argument_comparator):
122    def __init__(self, first_line):
123        self.first_line = first_line
124
125
126    def is_satisfied_by(self, parameter):
127        return self.first_line == parameter.splitlines()[0]
128
129
130class test_base_job(unittest.TestCase):
131    def setUp(self):
132        # make god
133        self.god = mock.mock_god(ut=self)
134
135        # need to set some environ variables
136        self.autodir = "autodir"
137        os.environ['AUTODIR'] = self.autodir
138
139        # set up some variables
140        _, self.control = tempfile.mkstemp()
141        self.jobtag = "jobtag"
142
143        # get rid of stdout and logging
144        sys.stdout = six.StringIO()
145        logging_manager.configure_logging(logging_config.TestingConfig())
146        logging.disable(logging.CRITICAL)
147        def placeholder_configure_logging(*args, **kwargs):
148            pass
149        self.god.stub_with(logging_manager, 'configure_logging',
150                           placeholder_configure_logging)
151        real_get_logging_manager = logging_manager.get_logging_manager
152        def get_logging_manager_no_fds(manage_stdout_and_stderr=False,
153                                       redirect_fds=False):
154            return real_get_logging_manager(manage_stdout_and_stderr, False)
155        self.god.stub_with(logging_manager, 'get_logging_manager',
156                           get_logging_manager_no_fds)
157
158        # stub out some stuff
159        self.god.stub_function(os.path, 'exists')
160        self.god.stub_function(os.path, 'isdir')
161        self.god.stub_function(os, 'makedirs')
162        self.god.stub_function(os, 'mkdir')
163        self.god.stub_function(os, 'remove')
164        self.god.stub_function(shutil, 'rmtree')
165        self.god.stub_function(shutil, 'copyfile')
166        self.god.stub_function(job, 'open')
167        self.god.stub_function(utils, 'system')
168        self.god.stub_function(utils, 'drop_caches')
169        self.god.stub_function(harness, 'select')
170        self.god.stub_function(sysinfo, 'log_per_reboot_data')
171
172        self.god.stub_class(job.local_host, 'LocalHost')
173        self.god.stub_class(sysinfo, 'sysinfo')
174
175        self.god.stub_class_method(job.base_client_job,
176                                   '_cleanup_debugdir_files')
177        self.god.stub_class_method(job.base_client_job, '_cleanup_results_dir')
178
179        self.god.stub_with(job.base_job.job_directory, '_ensure_valid',
180                           lambda *_: None)
181
182
183    def tearDown(self):
184        sys.stdout = sys.__stdout__
185        self.god.unstub_all()
186        os.remove(self.control)
187
188
189    def _setup_pre_record_init(self, cont):
190        self.god.stub_function(self.job, '_load_state')
191
192        resultdir = os.path.join(self.autodir, 'results', self.jobtag)
193        tmpdir = os.path.join(self.autodir, 'tmp')
194        if not cont:
195            job.base_client_job._cleanup_debugdir_files.expect_call()
196            job.base_client_job._cleanup_results_dir.expect_call()
197
198        self.job._load_state.expect_call()
199
200        my_harness = self.god.create_mock_class(harness.harness,
201                                                'my_harness')
202        harness.select.expect_call(None,
203                                   self.job,
204                                   None).and_return(my_harness)
205
206        return resultdir, my_harness
207
208
209    def _setup_post_record_init(self, cont, resultdir, my_harness):
210        # now some specific stubs
211        self.god.stub_function(self.job, 'config_get')
212        self.god.stub_function(self.job, 'config_set')
213        self.god.stub_function(self.job, 'record')
214
215        # other setup
216        results = os.path.join(self.autodir, 'results')
217        download = os.path.join(self.autodir, 'tests', 'download')
218        pkgdir = os.path.join(self.autodir, 'packages')
219
220        utils.drop_caches.expect_call()
221        job_sysinfo = sysinfo.sysinfo.expect_new(resultdir)
222        if not cont:
223            os.path.exists.expect_call(download).and_return(False)
224            os.mkdir.expect_call(download)
225            shutil.copyfile.expect_call(mock.is_string_comparator(),
226                                 os.path.join(resultdir, 'control'))
227
228        job.local_host.LocalHost.expect_new(hostname='localhost')
229        job_sysinfo.log_per_reboot_data.expect_call()
230        if not cont:
231            self.job.record.expect_call('START', None, None)
232
233        my_harness.run_start.expect_call()
234
235
236    def construct_job(self, cont):
237        # will construct class instance using __new__
238        self.job = job.base_client_job.__new__(job.base_client_job)
239
240        # record
241        resultdir, my_harness = self._setup_pre_record_init(cont)
242        self._setup_post_record_init(cont, resultdir, my_harness)
243
244        # finish constructor
245        options = placeholder()
246        options.tag = self.jobtag
247        options.cont = cont
248        options.harness = None
249        options.harness_args = None
250        options.log = False
251        options.verbose = False
252        options.hostname = 'localhost'
253        options.user = 'my_user'
254        options.args = ''
255        options.output_dir = ''
256        self.job.__init__(self.control, options)
257
258        # check
259        self.god.check_playback()
260
261
262    def get_partition_mock(self, devname):
263        """
264        Create a mock of a partition object and return it.
265        """
266        class mock(object):
267            device = devname
268            get_mountpoint = self.god.create_mock_function('get_mountpoint')
269        return mock
270
271
272    def test_constructor_first_run(self):
273        self.construct_job(False)
274
275
276    def test_constructor_continuation(self):
277        self.construct_job(True)
278
279
280    def test_constructor_post_record_failure(self):
281        """
282        Test post record initialization failure.
283        """
284        self.job = job.base_client_job.__new__(job.base_client_job)
285        options = placeholder()
286        options.tag = self.jobtag
287        options.cont = False
288        options.harness = None
289        options.harness_args = None
290        options.log = False
291        options.verbose = False
292        options.hostname = 'localhost'
293        options.user = 'my_user'
294        options.args = ''
295        options.output_dir = ''
296        error = Exception('fail')
297
298        self.god.stub_function(self.job, '_post_record_init')
299        self.god.stub_function(self.job, 'record')
300
301        self._setup_pre_record_init(False)
302        self.job._post_record_init.expect_call(
303                self.control, options, True).and_raises(error)
304        self.job.record.expect_call(
305                'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
306                str(error))
307
308        self.assertRaises(
309                Exception, self.job.__init__, self.control, options,
310                drop_caches=True)
311
312        # check
313        self.god.check_playback()
314
315
316    def test_control_functions(self):
317        self.construct_job(True)
318        control_file = "blah"
319        self.job.control_set(control_file)
320        self.assertEquals(self.job.control_get(), os.path.abspath(control_file))
321
322
323    def test_harness_select(self):
324        self.construct_job(True)
325
326        # record
327        which = "which"
328        harness_args = ''
329        harness.select.expect_call(which, self.job,
330                                   harness_args).and_return(None)
331
332        # run and test
333        self.job.harness_select(which, harness_args)
334        self.god.check_playback()
335
336
337    def test_setup_dirs_raise(self):
338        self.construct_job(True)
339
340        # setup
341        results_dir = 'foo'
342        tmp_dir = 'bar'
343
344        # record
345        os.path.exists.expect_call(tmp_dir).and_return(True)
346        os.path.isdir.expect_call(tmp_dir).and_return(False)
347
348        # test
349        self.assertRaises(ValueError, self.job.setup_dirs, results_dir, tmp_dir)
350        self.god.check_playback()
351
352
353    def test_setup_dirs(self):
354        self.construct_job(True)
355
356        # setup
357        results_dir1 = os.path.join(self.job.resultdir, 'build')
358        results_dir2 = os.path.join(self.job.resultdir, 'build.2')
359        results_dir3 = os.path.join(self.job.resultdir, 'build.3')
360        tmp_dir = 'bar'
361
362        # record
363        os.path.exists.expect_call(tmp_dir).and_return(False)
364        os.mkdir.expect_call(tmp_dir)
365        os.path.isdir.expect_call(tmp_dir).and_return(True)
366        os.path.exists.expect_call(results_dir1).and_return(True)
367        os.path.exists.expect_call(results_dir2).and_return(True)
368        os.path.exists.expect_call(results_dir3).and_return(False)
369        os.path.exists.expect_call(results_dir3).and_return(False)
370        os.mkdir.expect_call(results_dir3)
371
372        # test
373        self.assertEqual(self.job.setup_dirs(None, tmp_dir),
374                         (results_dir3, tmp_dir))
375        self.god.check_playback()
376
377
378    def test_run_test_logs_test_error_from_unhandled_error(self):
379        self.construct_job(True)
380
381        # set up stubs
382        self.god.stub_function(self.job.pkgmgr, 'get_package_name')
383        self.god.stub_function(self.job, "_runtest")
384
385        # create an unhandled error object
386        class MyError(error.TestError):
387            pass
388        real_error = MyError("this is the real error message")
389        unhandled_error = error.UnhandledTestError(real_error)
390
391        # set up the recording
392        testname = "error_test"
393        outputdir = os.path.join(self.job.resultdir, testname)
394        self.job.pkgmgr.get_package_name.expect_call(
395            testname, 'test').and_return(("", testname))
396        os.path.exists.expect_call(outputdir).and_return(False)
397        self.job.record.expect_call("START", testname, testname,
398                                    optional_fields=None)
399        self.job._runtest.expect_call(testname, "", None, (), {}).and_raises(
400            unhandled_error)
401        self.job.record.expect_call("ERROR", testname, testname,
402                                    first_line_comparator(str(real_error)))
403        self.job.record.expect_call("END ERROR", testname, testname)
404        self.job.harness.run_test_complete.expect_call()
405        utils.drop_caches.expect_call()
406
407        # run and check
408        self.job.run_test(testname)
409        self.god.check_playback()
410
411
412    def test_run_test_logs_non_test_error_from_unhandled_error(self):
413        self.construct_job(True)
414
415        # set up stubs
416        self.god.stub_function(self.job.pkgmgr, 'get_package_name')
417        self.god.stub_function(self.job, "_runtest")
418
419        # create an unhandled error object
420        class MyError(Exception):
421            pass
422        real_error = MyError("this is the real error message")
423        unhandled_error = error.UnhandledTestError(real_error)
424        reason = first_line_comparator("Unhandled MyError: %s" % real_error)
425
426        # set up the recording
427        testname = "error_test"
428        outputdir = os.path.join(self.job.resultdir, testname)
429        self.job.pkgmgr.get_package_name.expect_call(
430            testname, 'test').and_return(("", testname))
431        os.path.exists.expect_call(outputdir).and_return(False)
432        self.job.record.expect_call("START", testname, testname,
433                                    optional_fields=None)
434        self.job._runtest.expect_call(testname, "", None, (), {}).and_raises(
435            unhandled_error)
436        self.job.record.expect_call("ERROR", testname, testname, reason)
437        self.job.record.expect_call("END ERROR", testname, testname)
438        self.job.harness.run_test_complete.expect_call()
439        utils.drop_caches.expect_call()
440
441        # run and check
442        self.job.run_test(testname)
443        self.god.check_playback()
444
445
446    def test_report_reboot_failure(self):
447        self.construct_job(True)
448
449        # record
450        self.job.record.expect_call("ABORT", "sub", "reboot.verify",
451                                    "boot failure")
452        self.job.record.expect_call("END ABORT", "sub", "reboot",
453                                    optional_fields={"kernel": "2.6.15-smp"})
454
455        # playback
456        self.job._record_reboot_failure("sub", "reboot.verify", "boot failure",
457                                        running_id="2.6.15-smp")
458        self.god.check_playback()
459
460
461    def _setup_check_post_reboot(self, mount_info, cpu_count):
462        # setup
463        self.god.stub_function(job.partition_lib, "get_partition_list")
464        self.god.stub_function(utils, "count_cpus")
465
466        part_list = [self.get_partition_mock("/dev/hda1"),
467                     self.get_partition_mock("/dev/hdb1")]
468        mount_list = ["/mnt/hda1", "/mnt/hdb1"]
469
470        # record
471        job.partition_lib.get_partition_list.expect_call(
472                self.job, exclude_swap=False).and_return(part_list)
473        for i in range(len(part_list)):
474            part_list[i].get_mountpoint.expect_call().and_return(mount_list[i])
475        if cpu_count is not None:
476            utils.count_cpus.expect_call().and_return(cpu_count)
477        self.job._state.set('client', 'mount_info', mount_info)
478        self.job._state.set('client', 'cpu_count', 8)
479
480
481    def test_check_post_reboot_success(self):
482        self.construct_job(True)
483
484        mount_info = set([("/dev/hda1", "/mnt/hda1"),
485                          ("/dev/hdb1", "/mnt/hdb1")])
486        self._setup_check_post_reboot(mount_info, 8)
487
488        # playback
489        self.job._check_post_reboot("sub")
490        self.god.check_playback()
491
492
493    def test_check_post_reboot_mounts_failure(self):
494        self.construct_job(True)
495
496        mount_info = set([("/dev/hda1", "/mnt/hda1")])
497        self._setup_check_post_reboot(mount_info, None)
498
499        self.god.stub_function(self.job, "_record_reboot_failure")
500
501        if six.PY2:
502            self.job._record_reboot_failure.expect_call(
503                    "sub",
504                    "reboot.verify_config",
505                    "mounted partitions are different after"
506                    " reboot (old entries: set([]), new entries: set([('/dev/hdb1',"
507                    " '/mnt/hdb1')]))",
508                    running_id=None)
509        else:
510            # Py3 string formatting of sets is a bit different...
511            self.job._record_reboot_failure.expect_call(
512                    "sub",
513                    "reboot.verify_config",
514                    "mounted partitions are different after"
515                    " reboot (old entries: set(), new entries: {('/dev/hdb1',"
516                    " '/mnt/hdb1')})",
517                    running_id=None)
518
519        # playback
520        self.assertRaises(error.JobError, self.job._check_post_reboot, "sub")
521        self.god.check_playback()
522
523
524    def test_check_post_reboot_cpu_failure(self):
525        self.construct_job(True)
526
527        mount_info = set([("/dev/hda1", "/mnt/hda1"),
528                          ("/dev/hdb1", "/mnt/hdb1")])
529        self._setup_check_post_reboot(mount_info, 4)
530
531        self.god.stub_function(self.job, "_record_reboot_failure")
532        self.job._record_reboot_failure.expect_call(
533            'sub', 'reboot.verify_config',
534            'Number of CPUs changed after reboot (old count: 8, new count: 4)',
535            running_id=None)
536
537        # playback
538        self.assertRaises(error.JobError, self.job._check_post_reboot, "sub")
539        self.god.check_playback()
540
541
542    def test_parse_args(self):
543        test_set = {"a='foo bar baz' b='moo apt'":
544                    ["a='foo bar baz'", "b='moo apt'"],
545                    "a='foo bar baz' only=gah":
546                    ["a='foo bar baz'", "only=gah"],
547                    "a='b c d' no=argh":
548                    ["a='b c d'", "no=argh"]}
549        for t in test_set:
550            parsed_args = job.base_client_job._parse_args(t)
551            expected_args = test_set[t]
552            self.assertEqual(parsed_args, expected_args)
553
554
555    def test_run_test_timeout_parameter_is_propagated(self):
556        self.construct_job(True)
557
558        # set up stubs
559        self.god.stub_function(self.job.pkgmgr, 'get_package_name')
560        self.god.stub_function(self.job, "_runtest")
561
562        # create an unhandled error object
563        #class MyError(error.TestError):
564        #    pass
565        #real_error = MyError("this is the real error message")
566        #unhandled_error = error.UnhandledTestError(real_error)
567
568        # set up the recording
569        testname = "test"
570        outputdir = os.path.join(self.job.resultdir, testname)
571        self.job.pkgmgr.get_package_name.expect_call(
572            testname, 'test').and_return(("", testname))
573        os.path.exists.expect_call(outputdir).and_return(False)
574        timeout = 60
575        optional_fields = {}
576        optional_fields['timeout'] = timeout
577        self.job.record.expect_call("START", testname, testname,
578                                    optional_fields=optional_fields)
579        self.job._runtest.expect_call(testname, "", timeout, (), {})
580        self.job.record.expect_call("GOOD", testname, testname,
581                                    "completed successfully")
582        self.job.record.expect_call("END GOOD", testname, testname)
583        self.job.harness.run_test_complete.expect_call()
584        utils.drop_caches.expect_call()
585
586        # run and check
587        self.job.run_test(testname, timeout=timeout)
588        self.god.check_playback()
589
590
591class test_name_pattern(unittest.TestCase):
592    """Tests for _NAME_PATTERN."""
593
594    def _one_name_pattern_test(self, line, want):
595        """Parametrized test."""
596        match = re.match(job._NAME_PATTERN, line)
597        self.assertIsNotNone(match)
598        self.assertEqual(match.group(1), want)
599
600    def test_name_pattern_nospace_single(self):
601        self._one_name_pattern_test("NAME='some_Test'", 'some_Test')
602
603    def test_name_pattern_nospace_double(self):
604        self._one_name_pattern_test('NAME="some_Test"', 'some_Test')
605
606    def test_name_pattern_space_single(self):
607        self._one_name_pattern_test("NAME  =  'some_Test'", 'some_Test')
608
609    def test_name_pattern_space_double(self):
610        self._one_name_pattern_test('NAME  =  "some_Test"', 'some_Test')
611
612
613if __name__ == "__main__":
614    unittest.main()
615