xref: /aosp_15_r20/external/autotest/server/cros/cfm/configurable_test/actions_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1import unittest
2from unittest import mock
3
4from autotest_lib.client.common_lib.cros.cfm.usb import usb_device
5from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_spec
6from autotest_lib.server.cros.cfm.configurable_test import actions
7from autotest_lib.server.cros.cfm.configurable_test import action_context
8from autotest_lib.server.cros.cfm.configurable_test import scenario
9
10# Constants to use in case the actual values are irrelevant.
11USB_DEVICE_SPEC = usb_device_spec.UsbDeviceSpec(
12        'vid', 'pid', 'product', ['iface'])
13
14USB_DEVICE = usb_device.UsbDevice('v', 'p', 'prod', ['if'], 1, 2, 1)
15
16
17# Test, disable missing-docstring
18# pylint: disable=missing-docstring
19class TestActions(unittest.TestCase):
20    """
21    Tests for the available actions for configurable CFM tests to run.
22    """
23
24    def setUp(self):
25        self.host_mock = mock.MagicMock()
26        self.cfm_facade_mock = mock.MagicMock()
27        self.usb_device_collector_mock = mock.MagicMock()
28        self.usb_port_manager_mock = mock.MagicMock()
29        self.crash_detector_mock = mock.MagicMock()
30        self.metrics_collector_mock = mock.MagicMock()
31        self.context_with_mocks = action_context.ActionContext(
32                host=self.host_mock,
33                cfm_facade=self.cfm_facade_mock,
34                usb_device_collector=self.usb_device_collector_mock,
35                usb_port_manager=self.usb_port_manager_mock,
36                crash_detector=self.crash_detector_mock,
37                perf_metrics_collector=self.metrics_collector_mock)
38
39
40    def test_assert_file_does_not_contain_no_match(self):
41        action = actions.AssertFileDoesNotContain('/foo', ['EE', 'WW'])
42        context = action_context.ActionContext(
43                file_contents_collector=FakeCollector('abc\ndef'))
44        action.execute(context)
45
46    def test_assert_file_does_not_contain_match(self):
47        action = actions.AssertFileDoesNotContain('/foo', ['EE', 'WW'])
48        context = action_context.ActionContext(
49                file_contents_collector=FakeCollector('abc\naWWd'))
50        self.assertRaises(AssertionError, lambda: action.execute(context))
51
52    def test_assert_file_does_not_contain_regex_match(self):
53        action = actions.AssertFileDoesNotContain('/foo', ['EE', 'W{3}Q+'])
54        context = action_context.ActionContext(
55                file_contents_collector=FakeCollector('abc\naWWWQQd'))
56        self.assertRaises(AssertionError, lambda: action.execute(context))
57
58    def test_reboot_dut_no_restart(self):
59        action = actions.RebootDut()
60        action.execute(self.context_with_mocks)
61        self.host_mock.reboot.assert_called_once_with()
62        self.assertFalse(self.cfm_facade_mock.method_calls)
63
64    def test_reboot_dut_with_restart(self):
65        action = actions.RebootDut(restart_chrome_for_cfm=True)
66        action.execute(self.context_with_mocks)
67        self.host_mock.reboot.assert_called_once_with()
68        (self.cfm_facade_mock.restart_chrome_for_cfm
69                .assert_called_once_with())
70        (self.cfm_facade_mock.wait_for_telemetry_commands
71                .assert_called_once_with())
72
73    def test_assert_usb_device_collector(self):
74        spec = usb_device_spec.UsbDeviceSpec(
75                'vid', 'pid', 'product', ['iface'])
76        action = actions.AssertUsbDevices([spec], lambda x: True)
77        action.execute(self.context_with_mocks)
78
79    def test_assert_usb_device_collector_matching_predicate(self):
80        spec = usb_device_spec.UsbDeviceSpec(
81                'vid', 'pid', 'product', ['iface'])
82        device = usb_device.UsbDevice(
83                'v', 'p', 'prod', ['if'], 1, 2, 1)
84        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
85                return_value=[device])
86        action = actions.AssertUsbDevices(
87                [spec], lambda x: x[0].product_id == 'p')
88        action.execute(self.context_with_mocks)
89
90    def test_assert_usb_device_collector_non_matching_predicate(self):
91        spec = usb_device_spec.UsbDeviceSpec(
92                'vid', 'pid', 'product', ['iface'])
93        device = usb_device.UsbDevice(
94                'v', 'p', 'prod', ['if'], 1, 2, 1)
95        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
96                return_value=[device])
97        action = actions.AssertUsbDevices(
98                [spec], lambda x: x[0].product_id == 'r')
99        self.assertRaises(AssertionError, lambda: action.execute(
100                self.context_with_mocks))
101
102    def test_assert_usb_device_collector_default_predicate(self):
103        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
104                return_value=[USB_DEVICE])  # Default checks list is of size 1
105        action = actions.AssertUsbDevices([USB_DEVICE_SPEC])
106        action.execute(self.context_with_mocks)
107
108    def test_select_scenario_at_random(self):
109        placeholder_action1 = StubAction()
110        placeholder_action2 = StubAction()
111        scenarios = [
112                scenario.Scenario(placeholder_action1),
113                scenario.Scenario(placeholder_action2)
114        ]
115        action = actions.SelectScenarioAtRandom(scenarios, 10)
116        action.execute(self.context_with_mocks)
117        # Assert that our actions were executed the expected number of times.
118        total_executes = (placeholder_action1.executed_times +
119                          placeholder_action2.executed_times)
120        self.assertEqual(10, total_executes)
121
122    def test_select_scenario_at_random_str_contains_seed(self):
123        action = actions.SelectScenarioAtRandom([], 10, 123)
124        self.assertTrue('seed=123' in str(action))
125
126    def test_select_scenario_at_random_same_seed_same_actions(self):
127        scenario1_action1 = StubAction()
128        scenario1_action2 = StubAction()
129        scenarios1 = [scenario.Scenario(scenario1_action1),
130                     scenario.Scenario(scenario1_action2)]
131        scenario2_action1 = StubAction()
132        scenario2_action2 = StubAction()
133        scenarios2 = [scenario.Scenario(scenario2_action1),
134                     scenario.Scenario(scenario2_action2)]
135        action1 = actions.SelectScenarioAtRandom(scenarios1, 100, 0)
136        action2 = actions.SelectScenarioAtRandom(scenarios2, 100, 0)
137        action1.execute(self.context_with_mocks)
138        action2.execute(self.context_with_mocks)
139        self.assertEqual(scenario1_action1.executed_times,
140                         scenario2_action1.executed_times)
141        self.assertEqual(scenario1_action2.executed_times,
142                         scenario2_action2.executed_times)
143
144    def test_power_cycle_usb_port(self):
145        device = usb_device.UsbDevice(
146                'v', 'p', 'prod', ['if'], 1, 2, 1)
147        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
148                side_effect=[[device, device], [device], [device, device]])
149        action = actions.PowerCycleUsbPort(
150                [USB_DEVICE_SPEC], 0, lambda x: [x[0]])
151        action.execute(self.context_with_mocks)
152        self.usb_port_manager_mock.set_port_power.assert_has_calls(
153                [mock.call([(1, 2)], False), mock.call([(1, 2)], True)])
154
155    def test_power_cycle_usb_port_device_does_not_turn_off(self):
156        # Return the same device all the time - i.e., it does not turn off.
157        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
158                return_value=[USB_DEVICE])
159        action = actions.PowerCycleUsbPort([USB_DEVICE_SPEC], 0)
160        self.assertRaises(
161                actions.TimeoutError,
162                lambda: action.execute(self.context_with_mocks))
163
164    def test_power_cycle_usb_port_device_does_not_turn_on(self):
165        self.usb_device_collector_mock.get_devices_by_spec = mock.Mock(
166                side_effect=[[USB_DEVICE, USB_DEVICE], [], [USB_DEVICE]])
167        action = actions.PowerCycleUsbPort([USB_DEVICE_SPEC], 0)
168        self.assertRaises(
169                actions.TimeoutError,
170                lambda: action.execute(self.context_with_mocks))
171
172    def test_retry_action_success_after_retry(self):
173        action = actions.RetryAssertAction(RaisesFirstTimeAction(), 3, 0)
174        action.execute(self.context_with_mocks)
175
176    def test_retry_action_fail_when_no_more_retries(self):
177        action = actions.RetryAssertAction(RaisesFirstTimeAction(), 1)
178        self.assertRaises(
179                AssertionError, lambda: action.execute(self.context_with_mocks))
180
181    def test_assert_no_new_crashes(self):
182        action = actions.AssertNoNewCrashes()
183        self.crash_detector_mock.get_new_crash_files = mock.Mock(
184                return_value=[])
185        action.do_execute(self.context_with_mocks)
186
187    def test_assert_no_new_crashes_crash_detected(self):
188        action = actions.AssertNoNewCrashes()
189        self.crash_detector_mock.get_new_crash_files = mock.Mock(
190                return_value=['/a/new/crash/file'])
191        self.assertRaises(
192                AssertionError,
193                lambda: action.do_execute(self.context_with_mocks))
194
195    def test_start_metrics_colllection(self):
196        action = actions.StartPerfMetricsCollection()
197        action.execute(self.context_with_mocks)
198        self.metrics_collector_mock.start.assert_called_once_with()
199
200    def test_stop_metrics_colllection(self):
201        action = actions.StopPerfMetricsCollection()
202        action.execute(self.context_with_mocks)
203        self.metrics_collector_mock.stop.assert_called_once_with()
204
205    def test_upload_metrics(self):
206        action = actions.UploadPerfMetrics()
207        action.execute(self.context_with_mocks)
208        self.metrics_collector_mock.upload_metrics.assert_called_once_with()
209
210
211
212class FakeCollector(object):
213    def __init__(self, contents):
214        self.contents = contents
215
216    def collect_file_contents(self, path):
217        return self.contents
218
219
220class StubAction(actions.Action):
221    def __init__(self):
222        self.executed_times = 0
223
224    def do_execute(self, context):
225        self.executed_times += 1
226
227class RaisesFirstTimeAction(actions.Action):
228    def __init__(self):
229        self.executed = False
230
231    def do_execute(self, context):
232        if not self.executed:
233            self.executed = True
234            raise AssertionError()
235