1import unittest 2from unittest import mock 3 4from autotest_lib.client.common_lib.cros.cfm.usb import usb_device 5from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_spec 6from autotest_lib.server.cros.cfm.configurable_test import actions 7from autotest_lib.server.cros.cfm.configurable_test import action_context 8from autotest_lib.server.cros.cfm.configurable_test import scenario 9 10# Constants to use in case the actual values are irrelevant. 11USB_DEVICE_SPEC = usb_device_spec.UsbDeviceSpec( 12 'vid', 'pid', 'product', ['iface']) 13 14USB_DEVICE = usb_device.UsbDevice('v', 'p', 'prod', ['if'], 1, 2, 1) 15 16 17# Test, disable missing-docstring 18# pylint: disable=missing-docstring 19class TestActions(unittest.TestCase): 20 """ 21 Tests for the available actions for configurable CFM tests to run. 22 """ 23 24 def setUp(self): 25 self.host_mock = mock.MagicMock() 26 self.cfm_facade_mock = mock.MagicMock() 27 self.usb_device_collector_mock = mock.MagicMock() 28 self.usb_port_manager_mock = mock.MagicMock() 29 self.crash_detector_mock = mock.MagicMock() 30 self.metrics_collector_mock = mock.MagicMock() 31 self.context_with_mocks = action_context.ActionContext( 32 host=self.host_mock, 33 cfm_facade=self.cfm_facade_mock, 34 usb_device_collector=self.usb_device_collector_mock, 35 usb_port_manager=self.usb_port_manager_mock, 36 crash_detector=self.crash_detector_mock, 37 perf_metrics_collector=self.metrics_collector_mock) 38 39 40 def test_assert_file_does_not_contain_no_match(self): 41 action = actions.AssertFileDoesNotContain('/foo', ['EE', 'WW']) 42 context = action_context.ActionContext( 43 file_contents_collector=FakeCollector('abc\ndef')) 44 action.execute(context) 45 46 def test_assert_file_does_not_contain_match(self): 47 action = actions.AssertFileDoesNotContain('/foo', ['EE', 'WW']) 48 context = action_context.ActionContext( 49 file_contents_collector=FakeCollector('abc\naWWd')) 50 self.assertRaises(AssertionError, lambda: action.execute(context)) 51 52 def test_assert_file_does_not_contain_regex_match(self): 53 action = actions.AssertFileDoesNotContain('/foo', ['EE', 'W{3}Q+']) 54 context = action_context.ActionContext( 55 file_contents_collector=FakeCollector('abc\naWWWQQd')) 56 self.assertRaises(AssertionError, lambda: action.execute(context)) 57 58 def test_reboot_dut_no_restart(self): 59 action = actions.RebootDut() 60 action.execute(self.context_with_mocks) 61 self.host_mock.reboot.assert_called_once_with() 62 self.assertFalse(self.cfm_facade_mock.method_calls) 63 64 def test_reboot_dut_with_restart(self): 65 action = actions.RebootDut(restart_chrome_for_cfm=True) 66 action.execute(self.context_with_mocks) 67 self.host_mock.reboot.assert_called_once_with() 68 (self.cfm_facade_mock.restart_chrome_for_cfm 69 .assert_called_once_with()) 70 (self.cfm_facade_mock.wait_for_telemetry_commands 71 .assert_called_once_with()) 72 73 def test_assert_usb_device_collector(self): 74 spec = usb_device_spec.UsbDeviceSpec( 75 'vid', 'pid', 'product', ['iface']) 76 action = actions.AssertUsbDevices([spec], lambda x: True) 77 action.execute(self.context_with_mocks) 78 79 def test_assert_usb_device_collector_matching_predicate(self): 80 spec = usb_device_spec.UsbDeviceSpec( 81 'vid', 'pid', 'product', ['iface']) 82 device = usb_device.UsbDevice( 83 'v', 'p', 'prod', ['if'], 1, 2, 1) 84 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 85 return_value=[device]) 86 action = actions.AssertUsbDevices( 87 [spec], lambda x: x[0].product_id == 'p') 88 action.execute(self.context_with_mocks) 89 90 def test_assert_usb_device_collector_non_matching_predicate(self): 91 spec = usb_device_spec.UsbDeviceSpec( 92 'vid', 'pid', 'product', ['iface']) 93 device = usb_device.UsbDevice( 94 'v', 'p', 'prod', ['if'], 1, 2, 1) 95 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 96 return_value=[device]) 97 action = actions.AssertUsbDevices( 98 [spec], lambda x: x[0].product_id == 'r') 99 self.assertRaises(AssertionError, lambda: action.execute( 100 self.context_with_mocks)) 101 102 def test_assert_usb_device_collector_default_predicate(self): 103 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 104 return_value=[USB_DEVICE]) # Default checks list is of size 1 105 action = actions.AssertUsbDevices([USB_DEVICE_SPEC]) 106 action.execute(self.context_with_mocks) 107 108 def test_select_scenario_at_random(self): 109 placeholder_action1 = StubAction() 110 placeholder_action2 = StubAction() 111 scenarios = [ 112 scenario.Scenario(placeholder_action1), 113 scenario.Scenario(placeholder_action2) 114 ] 115 action = actions.SelectScenarioAtRandom(scenarios, 10) 116 action.execute(self.context_with_mocks) 117 # Assert that our actions were executed the expected number of times. 118 total_executes = (placeholder_action1.executed_times + 119 placeholder_action2.executed_times) 120 self.assertEqual(10, total_executes) 121 122 def test_select_scenario_at_random_str_contains_seed(self): 123 action = actions.SelectScenarioAtRandom([], 10, 123) 124 self.assertTrue('seed=123' in str(action)) 125 126 def test_select_scenario_at_random_same_seed_same_actions(self): 127 scenario1_action1 = StubAction() 128 scenario1_action2 = StubAction() 129 scenarios1 = [scenario.Scenario(scenario1_action1), 130 scenario.Scenario(scenario1_action2)] 131 scenario2_action1 = StubAction() 132 scenario2_action2 = StubAction() 133 scenarios2 = [scenario.Scenario(scenario2_action1), 134 scenario.Scenario(scenario2_action2)] 135 action1 = actions.SelectScenarioAtRandom(scenarios1, 100, 0) 136 action2 = actions.SelectScenarioAtRandom(scenarios2, 100, 0) 137 action1.execute(self.context_with_mocks) 138 action2.execute(self.context_with_mocks) 139 self.assertEqual(scenario1_action1.executed_times, 140 scenario2_action1.executed_times) 141 self.assertEqual(scenario1_action2.executed_times, 142 scenario2_action2.executed_times) 143 144 def test_power_cycle_usb_port(self): 145 device = usb_device.UsbDevice( 146 'v', 'p', 'prod', ['if'], 1, 2, 1) 147 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 148 side_effect=[[device, device], [device], [device, device]]) 149 action = actions.PowerCycleUsbPort( 150 [USB_DEVICE_SPEC], 0, lambda x: [x[0]]) 151 action.execute(self.context_with_mocks) 152 self.usb_port_manager_mock.set_port_power.assert_has_calls( 153 [mock.call([(1, 2)], False), mock.call([(1, 2)], True)]) 154 155 def test_power_cycle_usb_port_device_does_not_turn_off(self): 156 # Return the same device all the time - i.e., it does not turn off. 157 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 158 return_value=[USB_DEVICE]) 159 action = actions.PowerCycleUsbPort([USB_DEVICE_SPEC], 0) 160 self.assertRaises( 161 actions.TimeoutError, 162 lambda: action.execute(self.context_with_mocks)) 163 164 def test_power_cycle_usb_port_device_does_not_turn_on(self): 165 self.usb_device_collector_mock.get_devices_by_spec = mock.Mock( 166 side_effect=[[USB_DEVICE, USB_DEVICE], [], [USB_DEVICE]]) 167 action = actions.PowerCycleUsbPort([USB_DEVICE_SPEC], 0) 168 self.assertRaises( 169 actions.TimeoutError, 170 lambda: action.execute(self.context_with_mocks)) 171 172 def test_retry_action_success_after_retry(self): 173 action = actions.RetryAssertAction(RaisesFirstTimeAction(), 3, 0) 174 action.execute(self.context_with_mocks) 175 176 def test_retry_action_fail_when_no_more_retries(self): 177 action = actions.RetryAssertAction(RaisesFirstTimeAction(), 1) 178 self.assertRaises( 179 AssertionError, lambda: action.execute(self.context_with_mocks)) 180 181 def test_assert_no_new_crashes(self): 182 action = actions.AssertNoNewCrashes() 183 self.crash_detector_mock.get_new_crash_files = mock.Mock( 184 return_value=[]) 185 action.do_execute(self.context_with_mocks) 186 187 def test_assert_no_new_crashes_crash_detected(self): 188 action = actions.AssertNoNewCrashes() 189 self.crash_detector_mock.get_new_crash_files = mock.Mock( 190 return_value=['/a/new/crash/file']) 191 self.assertRaises( 192 AssertionError, 193 lambda: action.do_execute(self.context_with_mocks)) 194 195 def test_start_metrics_colllection(self): 196 action = actions.StartPerfMetricsCollection() 197 action.execute(self.context_with_mocks) 198 self.metrics_collector_mock.start.assert_called_once_with() 199 200 def test_stop_metrics_colllection(self): 201 action = actions.StopPerfMetricsCollection() 202 action.execute(self.context_with_mocks) 203 self.metrics_collector_mock.stop.assert_called_once_with() 204 205 def test_upload_metrics(self): 206 action = actions.UploadPerfMetrics() 207 action.execute(self.context_with_mocks) 208 self.metrics_collector_mock.upload_metrics.assert_called_once_with() 209 210 211 212class FakeCollector(object): 213 def __init__(self, contents): 214 self.contents = contents 215 216 def collect_file_contents(self, path): 217 return self.contents 218 219 220class StubAction(actions.Action): 221 def __init__(self): 222 self.executed_times = 0 223 224 def do_execute(self, context): 225 self.executed_times += 1 226 227class RaisesFirstTimeAction(actions.Action): 228 def __init__(self): 229 self.executed = False 230 231 def do_execute(self, context): 232 if not self.executed: 233 self.executed = True 234 raise AssertionError() 235