xref: /aosp_15_r20/external/autotest/tko/utils_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li#!/usr/bin/python3
2*9c5db199SXin Li
3*9c5db199SXin Lifrom __future__ import absolute_import
4*9c5db199SXin Lifrom __future__ import division
5*9c5db199SXin Lifrom __future__ import print_function
6*9c5db199SXin Li
7*9c5db199SXin Liimport os, unittest, time, datetime, itertools
8*9c5db199SXin Li
9*9c5db199SXin Liimport common
10*9c5db199SXin Lifrom autotest_lib.client.common_lib.test_utils import mock
11*9c5db199SXin Lifrom autotest_lib.tko import utils
12*9c5db199SXin Lifrom six.moves import zip
13*9c5db199SXin Li
14*9c5db199SXin Li
15*9c5db199SXin Liclass get_timestamp_test(unittest.TestCase):
16*9c5db199SXin Li    def test_zero_time(self):
17*9c5db199SXin Li        date = utils.get_timestamp({"key": "0"}, "key")
18*9c5db199SXin Li        timezone = datetime.timedelta(seconds=time.timezone)
19*9c5db199SXin Li        utc_date = date + timezone
20*9c5db199SXin Li        # should be equal to epoch, i.e. Jan 1, 1970
21*9c5db199SXin Li        self.assertEquals(utc_date.year, 1970)
22*9c5db199SXin Li        self.assertEquals(utc_date.month, 1)
23*9c5db199SXin Li        self.assertEquals(utc_date.day, 1)
24*9c5db199SXin Li        self.assertEquals(utc_date.hour, 0)
25*9c5db199SXin Li        self.assertEquals(utc_date.minute, 0)
26*9c5db199SXin Li        self.assertEquals(utc_date.second, 0)
27*9c5db199SXin Li        self.assertEquals(utc_date.microsecond, 0)
28*9c5db199SXin Li
29*9c5db199SXin Li
30*9c5db199SXin Li    def test_returns_none_on_missing_value(self):
31*9c5db199SXin Li        date = utils.get_timestamp({}, "missing_key")
32*9c5db199SXin Li        self.assertEquals(date, None)
33*9c5db199SXin Li
34*9c5db199SXin Li
35*9c5db199SXin Li    def test_fails_on_non_integer_values(self):
36*9c5db199SXin Li        self.assertRaises(ValueError, utils.get_timestamp,
37*9c5db199SXin Li                          {"key": "zero"}, "key")
38*9c5db199SXin Li
39*9c5db199SXin Li
40*9c5db199SXin Li    def test_date_can_be_string_or_integer(self):
41*9c5db199SXin Li        int_times = [1, 12, 123, 1234, 12345, 123456]
42*9c5db199SXin Li        str_times = [str(t) for t in int_times]
43*9c5db199SXin Li        for int_t, str_t in zip(int_times, str_times):
44*9c5db199SXin Li            date_int = utils.get_timestamp({"key": int_t}, "key")
45*9c5db199SXin Li            date_str = utils.get_timestamp({"key": str_t}, "key")
46*9c5db199SXin Li            self.assertEquals(date_int, date_str)
47*9c5db199SXin Li
48*9c5db199SXin Li
49*9c5db199SXin Liclass find_toplevel_job_dir_test(unittest.TestCase):
50*9c5db199SXin Li    def setUp(self):
51*9c5db199SXin Li        self.god = mock.mock_god()
52*9c5db199SXin Li        self.god.stub_function(os.path, "exists")
53*9c5db199SXin Li
54*9c5db199SXin Li
55*9c5db199SXin Li    def tearDown(self):
56*9c5db199SXin Li        self.god.unstub_all()
57*9c5db199SXin Li
58*9c5db199SXin Li
59*9c5db199SXin Li    def test_start_is_toplevel(self):
60*9c5db199SXin Li        jobdir = "/results/job1"
61*9c5db199SXin Li        os.path.exists.expect_call(
62*9c5db199SXin Li            jobdir + "/.autoserv_execute").and_return(True)
63*9c5db199SXin Li        self.assertEqual(utils.find_toplevel_job_dir(jobdir), jobdir)
64*9c5db199SXin Li
65*9c5db199SXin Li
66*9c5db199SXin Li    def test_parent_is_toplevel(self):
67*9c5db199SXin Li        jobdir = "/results/job2"
68*9c5db199SXin Li        os.path.exists.expect_call(
69*9c5db199SXin Li            jobdir + "/sub/.autoserv_execute").and_return(False)
70*9c5db199SXin Li        os.path.exists.expect_call(
71*9c5db199SXin Li            jobdir + "/.autoserv_execute").and_return(True)
72*9c5db199SXin Li        self.assertEqual(utils.find_toplevel_job_dir(jobdir + "/sub"), jobdir)
73*9c5db199SXin Li
74*9c5db199SXin Li
75*9c5db199SXin Li    def test_grandparent_is_toplevel(self):
76*9c5db199SXin Li        jobdir = "/results/job3"
77*9c5db199SXin Li        os.path.exists.expect_call(
78*9c5db199SXin Li            jobdir + "/sub/sub/.autoserv_execute").and_return(False)
79*9c5db199SXin Li        os.path.exists.expect_call(
80*9c5db199SXin Li            jobdir + "/sub/.autoserv_execute").and_return(False)
81*9c5db199SXin Li        os.path.exists.expect_call(
82*9c5db199SXin Li            jobdir + "/.autoserv_execute").and_return(True)
83*9c5db199SXin Li        self.assertEqual(utils.find_toplevel_job_dir(jobdir + "/sub/sub"),
84*9c5db199SXin Li                         jobdir)
85*9c5db199SXin Li
86*9c5db199SXin Li    def test_root_is_toplevel(self):
87*9c5db199SXin Li        jobdir = "/results/job4"
88*9c5db199SXin Li        os.path.exists.expect_call(
89*9c5db199SXin Li            jobdir + "/.autoserv_execute").and_return(False)
90*9c5db199SXin Li        os.path.exists.expect_call(
91*9c5db199SXin Li            "/results/.autoserv_execute").and_return(False)
92*9c5db199SXin Li        os.path.exists.expect_call("/.autoserv_execute").and_return(True)
93*9c5db199SXin Li        self.assertEqual(utils.find_toplevel_job_dir(jobdir), "/")
94*9c5db199SXin Li
95*9c5db199SXin Li
96*9c5db199SXin Li    def test_no_toplevel(self):
97*9c5db199SXin Li        jobdir = "/results/job5"
98*9c5db199SXin Li        os.path.exists.expect_call(
99*9c5db199SXin Li            jobdir + "/.autoserv_execute").and_return(False)
100*9c5db199SXin Li        os.path.exists.expect_call(
101*9c5db199SXin Li            "/results/.autoserv_execute").and_return(False)
102*9c5db199SXin Li        os.path.exists.expect_call("/.autoserv_execute").and_return(False)
103*9c5db199SXin Li        self.assertEqual(utils.find_toplevel_job_dir(jobdir), None)
104*9c5db199SXin Li
105*9c5db199SXin Li
106*9c5db199SXin Liclass drop_redundant_messages(unittest.TestCase):
107*9c5db199SXin Li    def test_empty_set(self):
108*9c5db199SXin Li        self.assertEqual(utils.drop_redundant_messages(set()), set())
109*9c5db199SXin Li
110*9c5db199SXin Li
111*9c5db199SXin Li    def test_singleton(self):
112*9c5db199SXin Li        self.assertEqual(utils.drop_redundant_messages(set(["abc"])),
113*9c5db199SXin Li                         set(["abc"]))
114*9c5db199SXin Li
115*9c5db199SXin Li
116*9c5db199SXin Li    def test_distinct_messages(self):
117*9c5db199SXin Li        self.assertEqual(utils.drop_redundant_messages(set(["abc", "def"])),
118*9c5db199SXin Li                         set(["abc", "def"]))
119*9c5db199SXin Li
120*9c5db199SXin Li
121*9c5db199SXin Li    def test_one_unique_message(self):
122*9c5db199SXin Li        self.assertEqual(
123*9c5db199SXin Li                utils.drop_redundant_messages(set(["abc", "abcd", "abcde"])),
124*9c5db199SXin Li                set(["abcde"]))
125*9c5db199SXin Li
126*9c5db199SXin Li
127*9c5db199SXin Li    def test_some_unique_some_not(self):
128*9c5db199SXin Li        self.assertEqual(
129*9c5db199SXin Li                utils.drop_redundant_messages(set(["abc", "def", "abcdef",
130*9c5db199SXin Li                                                   "defghi", "cd"])),
131*9c5db199SXin Li                set(["abcdef", "defghi"]))
132*9c5db199SXin Li
133*9c5db199SXin Li
134*9c5db199SXin Liif __name__ == "__main__":
135*9c5db199SXin Li    unittest.main()
136