1#!/usr/bin/python3 2# Copyright 2017 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import itertools 7import unittest 8from unittest import mock 9 10import common 11from autotest_lib.client.common_lib import error 12from autotest_lib.client.common_lib import hosts 13from autotest_lib.client.common_lib.cros import retry 14from autotest_lib.server.hosts import cros_firmware 15from autotest_lib.server.hosts import cros_repair 16from autotest_lib.server.hosts import repair_utils 17 18 19CROS_VERIFY_DAG = ( 20 (repair_utils.PingVerifier, 'ping', ()), 21 (repair_utils.SshVerifier, 'ssh', ('ping', )), 22 (cros_repair.ServoUSBDriveVerifier, 'usb_drive', ()), 23 (cros_repair.DevDefaultBootVerifier, 'dev_default_boot', ('ssh', )), 24 (cros_repair.DevModeVerifier, 'devmode', ('ssh', )), 25 (cros_repair.EnrollmentStateVerifier, 'enrollment_state', ('ssh', )), 26 (cros_repair.HWIDVerifier, 'hwid', ('ssh', )), 27 (cros_repair.ACPowerVerifier, 'power', ('ssh', )), 28 (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh', )), 29 (cros_repair.WritableVerifier, 'writable', ('ssh', )), 30 (cros_repair.TPMStatusVerifier, 'tpm', ('ssh', )), 31 (cros_repair.UpdateSuccessVerifier, 'good_provision', ('ssh', )), 32 (cros_repair.FirmwareTpmVerifier, 'faft_tpm', ('ssh', )), 33 (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh', )), 34 (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh', )), 35 (cros_repair.PythonVerifier, 'python', ('ssh', )), 36 (repair_utils.LegacyHostVerifier, 'cros', ('ssh', )), 37 (cros_repair.ProvisioningLabelsVerifier, 'provisioning_labels', 38 ('ssh', )), 39 (cros_repair.StopStartUIVerifier, 'stop_start_ui', ('ssh', )), 40 (cros_repair.DUTStorageVerifier, 'storage', ('ssh', )), 41 (cros_repair.AuditBattery, 'audit_battery', ()), 42 (cros_repair.GscToolPresentVerifier, 'dut_gsctool', ('ssh', )), 43 (cros_repair.ServoKeyboardMapVerifier, 'dut_servo_keyboard', 44 ('ssh', )), 45 (cros_repair.ServoMacAddressVerifier, 'dut_servo_macaddr', ('ssh', )), 46) 47 48CROS_REPAIR_ACTIONS = ( 49 (repair_utils.RPMCycleRepair, 'rpm', (), ( 50 'ping', 51 'ssh', 52 'power', 53 )), 54 (cros_repair.ServoResetRepair, 'servoreset', (), ( 55 'ping', 56 'ssh', 57 'stop_start_ui', 58 'power', 59 )), 60 (cros_repair.ServoCr50RebootRepair, 'cr50_reset', (), 61 ('ping', 'ssh', 'stop_start_ui', 'power')), 62 (cros_repair.ServoSysRqRepair, 'sysrq', (), ( 63 'ping', 64 'ssh', 65 )), 66 (cros_repair.ProvisioningLabelsRepair, 'provisioning_labels_repair', 67 ('ssh', ), ('provisioning_labels', )), 68 (cros_firmware.FaftFirmwareRepair, 'faft_firmware_repair', (), 69 ('ping', 'ssh', 'fwstatus', 'good_provision')), 70 (cros_repair.DevDefaultBootRepair, 'set_default_boot', ('ssh', ), 71 ('dev_default_boot', )), 72 (cros_repair.CrosRebootRepair, 'reboot', ('ssh', ), ( 73 'devmode', 74 'writable', 75 )), 76 (cros_repair.EnrollmentCleanupRepair, 'cleanup_enrollment', ('ssh', ), 77 ('enrollment_state', )), 78 (cros_firmware.GeneralFirmwareRepair, 'general_firmware', 79 ('usb_drive', ), ( 80 'ping', 81 'ssh', 82 )), 83 (cros_repair.RecoverACPowerRepair, 'ac_recover', (), ('ping', 84 'power')), 85 (cros_repair.ProvisionRepair, 'provision', 86 ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 87 'ext4'), ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', 88 'dev_default_boot', 'stop_start_ui', 'dut_gsctool')), 89 (cros_repair.PowerWashRepair, 'powerwash', ('ping', 'ssh', 'writable'), 90 ('tpm', 'good_provision', 'ext4', 'power', 'rwfw', 'fwstatus', 91 'python', 'hwid', 'cros', 'dev_default_boot', 'stop_start_ui', 92 'dut_gsctool')), 93 (cros_repair.ServoInstallRepair, 'usb', ('usb_drive', ), 94 ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 'ext4', 'power', 95 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', 'dev_default_boot', 96 'stop_start_ui', 'dut_gsctool', 'faft_tpm')), 97 (cros_repair.ServoResetAfterUSBRepair, 'servo_reset_after_usb', 98 ('usb_drive', ), ('ping', 'ssh')), 99 (cros_repair.RecoverFwAfterUSBRepair, 'recover_fw_after_usb', 100 ('usb_drive', ), ('ping', 'ssh')), 101) 102 103MOBLAB_VERIFY_DAG = ( 104 (repair_utils.SshVerifier, 'ssh', ()), 105 (cros_repair.ACPowerVerifier, 'power', ('ssh',)), 106 (cros_repair.PythonVerifier, 'python', ('ssh',)), 107 (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)), 108) 109 110MOBLAB_REPAIR_ACTIONS = ( 111 (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)), 112 (cros_repair.ProvisionRepair, 113 'provision', ('ssh',), ('power', 'python', 'cros',)), 114) 115 116JETSTREAM_VERIFY_DAG = ( 117 (repair_utils.PingVerifier, 'ping', ()), 118 (repair_utils.SshVerifier, 'ssh', ('ping', )), 119 (cros_repair.ServoUSBDriveVerifier, 'usb_drive', ()), 120 (cros_repair.DevDefaultBootVerifier, 'dev_default_boot', ('ssh', )), 121 (cros_repair.DevModeVerifier, 'devmode', ('ssh', )), 122 (cros_repair.EnrollmentStateVerifier, 'enrollment_state', ('ssh', )), 123 (cros_repair.HWIDVerifier, 'hwid', ('ssh', )), 124 (cros_repair.ACPowerVerifier, 'power', ('ssh', )), 125 (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh', )), 126 (cros_repair.WritableVerifier, 'writable', ('ssh', )), 127 (cros_repair.TPMStatusVerifier, 'tpm', ('ssh', )), 128 (cros_repair.UpdateSuccessVerifier, 'good_provision', ('ssh', )), 129 (cros_repair.FirmwareTpmVerifier, 'faft_tpm', ('ssh', )), 130 (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh', )), 131 (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh', )), 132 (cros_repair.PythonVerifier, 'python', ('ssh', )), 133 (repair_utils.LegacyHostVerifier, 'cros', ('ssh', )), 134 (cros_repair.ProvisioningLabelsVerifier, 'provisioning_labels', 135 ('ssh', )), 136 (cros_repair.JetstreamTpmVerifier, 'jetstream_tpm', ('ssh', )), 137 (cros_repair.JetstreamAttestationVerifier, 'jetstream_attestation', 138 ('ssh', )), 139 (cros_repair.JetstreamServicesVerifier, 'jetstream_services', 140 ('ssh', )), 141) 142 143JETSTREAM_REPAIR_ACTIONS = ( 144 (repair_utils.RPMCycleRepair, 'rpm', (), ( 145 'ping', 146 'ssh', 147 'power', 148 )), 149 (cros_repair.ServoResetRepair, 'servoreset', (), ( 150 'ping', 151 'ssh', 152 )), 153 (cros_repair.ServoCr50RebootRepair, 'cr50_reset', (), ( 154 'ping', 155 'ssh', 156 )), 157 (cros_repair.ServoSysRqRepair, 'sysrq', (), ( 158 'ping', 159 'ssh', 160 )), 161 (cros_repair.ProvisioningLabelsRepair, 'provisioning_labels_repair', 162 ('ssh', ), ('provisioning_labels', )), 163 (cros_firmware.FaftFirmwareRepair, 'faft_firmware_repair', (), 164 ('ping', 'ssh', 'fwstatus', 'good_provision')), 165 (cros_repair.DevDefaultBootRepair, 'set_default_boot', ('ssh', ), 166 ('dev_default_boot', )), 167 (cros_repair.CrosRebootRepair, 'reboot', ('ssh', ), ( 168 'devmode', 169 'writable', 170 )), 171 (cros_repair.EnrollmentCleanupRepair, 'cleanup_enrollment', ('ssh', ), 172 ('enrollment_state', )), 173 (cros_repair.JetstreamTpmRepair, 'jetstream_tpm_repair', 174 ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 'ext4'), 175 ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', 176 'dev_default_boot', 'jetstream_tpm', 'jetstream_attestation')), 177 (cros_repair.JetstreamServiceRepair, 'jetstream_service_repair', 178 ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 'ext4', 179 'jetstream_tpm', 'jetstream_attestation'), 180 ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', 181 'dev_default_boot', 'jetstream_tpm', 'jetstream_attestation', 182 'jetstream_services')), 183 (cros_repair.ProvisionRepair, 'provision', 184 ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 185 'ext4'), ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros', 186 'dev_default_boot', 'jetstream_tpm', 187 'jetstream_attestation', 'jetstream_services')), 188 (cros_repair.PowerWashRepair, 'powerwash', ('ping', 'ssh', 'writable'), 189 ('tpm', 'good_provision', 'ext4', 'power', 'rwfw', 'fwstatus', 190 'python', 'hwid', 'cros', 'dev_default_boot', 'jetstream_tpm', 191 'jetstream_attestation', 'jetstream_services')), 192 (cros_repair.ServoInstallRepair, 'usb', ('usb_drive', ), ( 193 'ping', 194 'ssh', 195 'writable', 196 'tpm', 197 'good_provision', 198 'ext4', 199 'power', 200 'rwfw', 201 'fwstatus', 202 'python', 203 'hwid', 204 'cros', 205 'dev_default_boot', 206 'jetstream_tpm', 207 'jetstream_attestation', 208 'jetstream_services', 209 'faft_tpm', 210 )), 211) 212 213TPM_STATUS_OWNED = """ 214Message Reply: [tpm_manager.GetTpmNonsensitiveStatusReply] { 215 status: STATUS_SUCCESS 216 is_enabled: true 217 is_owned: true 218 is_owner_password_present: true 219 has_reset_lock_permissions: true 220 is_srk_default_auth: true 221} 222""" 223 224TPM_STATUS_NOT_OWNED = """ 225Message Reply: [tpm_manager.GetTpmNonsensitiveStatusReply] { 226 status: STATUS_SUCCESS 227 is_enabled: true 228 is_owned: false 229 is_owner_password_present: false 230 has_reset_lock_permissions: false 231 is_srk_default_auth: true 232} 233""" 234 235TPM_STATUS_CANNOT_LOAD_SRK = """ 236Message Reply: [tpm_manager.GetTpmNonsensitiveStatusReply] { 237 status: STATUS_SUCCESS 238 is_enabled: true 239 is_owned: true 240 is_owner_password_present: false 241 has_reset_lock_permissions: false 242 is_srk_default_auth: false 243} 244""" 245 246TPM_STATUS_READY = """ 247TPM Enabled: true 248TPM Owned: true 249TPM Being Owned: false 250TPM Ready: true 251TPM Password: 9eaee4da8b4c 252""" 253 254TPM_STATUS_NOT_READY = """ 255TPM Enabled: true 256TPM Owned: false 257TPM Being Owned: true 258TPM Ready: false 259TPM Password: 260""" 261 262 263class CrosRepairUnittests(unittest.TestCase): 264 # pylint: disable=missing-docstring 265 266 maxDiff = None 267 268 def test_cros_repair(self): 269 verify_dag = cros_repair._cros_verify_dag() 270 self.assertTupleEqual(verify_dag, CROS_VERIFY_DAG) 271 self.check_verify_dag(verify_dag) 272 repair_actions = cros_repair._cros_repair_actions() 273 self.assertTupleEqual(repair_actions, CROS_REPAIR_ACTIONS) 274 self.check_repair_actions(verify_dag, repair_actions) 275 276 def test_moblab_repair(self): 277 verify_dag = cros_repair._moblab_verify_dag() 278 self.assertTupleEqual(verify_dag, MOBLAB_VERIFY_DAG) 279 self.check_verify_dag(verify_dag) 280 repair_actions = cros_repair._moblab_repair_actions() 281 self.assertTupleEqual(repair_actions, MOBLAB_REPAIR_ACTIONS) 282 self.check_repair_actions(verify_dag, repair_actions) 283 284 def test_jetstream_repair(self): 285 verify_dag = cros_repair._jetstream_verify_dag() 286 self.assertTupleEqual(verify_dag, JETSTREAM_VERIFY_DAG) 287 self.check_verify_dag(verify_dag) 288 repair_actions = cros_repair._jetstream_repair_actions() 289 self.assertTupleEqual(repair_actions, JETSTREAM_REPAIR_ACTIONS) 290 self.check_repair_actions(verify_dag, repair_actions) 291 292 def check_verify_dag(self, verify_dag): 293 """Checks that dependency labels are defined.""" 294 labels = [n[1] for n in verify_dag] 295 for node in verify_dag: 296 for dep in node[2]: 297 self.assertIn(dep, labels) 298 299 def check_repair_actions(self, verify_dag, repair_actions): 300 """Checks that dependency and trigger labels are defined.""" 301 verify_labels = [n[1] for n in verify_dag] 302 for action in repair_actions: 303 deps = action[2] 304 triggers = action[3] 305 for label in deps + triggers: 306 self.assertIn(label, verify_labels) 307 308 def test_get_tpm_status_owned(self): 309 mock_host = mock.Mock() 310 mock_host.run.return_value.stdout = TPM_STATUS_OWNED 311 status = cros_repair.TpmStatus(mock_host) 312 self.assertTrue(status.tpm_enabled) 313 self.assertTrue(status.tpm_owned) 314 self.assertTrue(status.tpm_can_load_srk) 315 self.assertTrue(status.tpm_can_load_srk_pubkey) 316 317 def test_get_tpm_status_not_owned(self): 318 mock_host = mock.Mock() 319 mock_host.run.return_value.stdout = TPM_STATUS_NOT_OWNED 320 status = cros_repair.TpmStatus(mock_host) 321 self.assertTrue(status.tpm_enabled) 322 self.assertFalse(status.tpm_owned) 323 self.assertFalse(status.tpm_can_load_srk) 324 self.assertFalse(status.tpm_can_load_srk_pubkey) 325 326 @mock.patch.object(cros_repair, '_is_virtual_machine') 327 def test_tpm_status_verifier_owned(self, mock_is_virt): 328 mock_is_virt.return_value = False 329 mock_host = mock.Mock() 330 mock_host.run.return_value.stdout = TPM_STATUS_OWNED 331 tpm_verifier = cros_repair.TPMStatusVerifier('test', []) 332 tpm_verifier.verify(mock_host) 333 334 @mock.patch.object(cros_repair, '_is_virtual_machine') 335 def test_tpm_status_verifier_not_owned(self, mock_is_virt): 336 mock_is_virt.return_value = False 337 mock_host = mock.Mock() 338 mock_host.run.return_value.stdout = TPM_STATUS_NOT_OWNED 339 tpm_verifier = cros_repair.TPMStatusVerifier('test', []) 340 tpm_verifier.verify(mock_host) 341 342 @mock.patch.object(cros_repair, '_is_virtual_machine') 343 def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt): 344 mock_is_virt.return_value = False 345 mock_host = mock.Mock() 346 mock_host.run.return_value.stdout = TPM_STATUS_CANNOT_LOAD_SRK 347 tpm_verifier = cros_repair.TPMStatusVerifier('test', []) 348 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 349 tpm_verifier.verify(mock_host) 350 self.assertEqual('Cannot load the TPM SRK', str(ctx.exception)) 351 352 def test_jetstream_tpm_owned(self): 353 mock_host = mock.Mock() 354 mock_host.run.side_effect = [ 355 mock.Mock(stdout=TPM_STATUS_OWNED), 356 mock.Mock(stdout=TPM_STATUS_READY), 357 ] 358 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 359 tpm_verifier.verify(mock_host) 360 361 @mock.patch.object(retry.logging, 'warning') 362 @mock.patch.object(retry.time, 'time') 363 @mock.patch.object(retry.time, 'sleep') 364 def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging): 365 mock_time.side_effect = itertools.count(0, 20) 366 mock_host = mock.Mock() 367 mock_host.run.return_value.stdout = TPM_STATUS_NOT_OWNED 368 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 369 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 370 tpm_verifier.verify(mock_host) 371 self.assertEqual('TPM is not owned', str(ctx.exception)) 372 373 @mock.patch.object(retry.logging, 'warning') 374 @mock.patch.object(retry.time, 'time') 375 @mock.patch.object(retry.time, 'sleep') 376 def test_jetstream_tpm_not_ready(self, mock_sleep, mock_time, mock_logging): 377 mock_time.side_effect = itertools.count(0, 20) 378 mock_host = mock.Mock() 379 mock_host.run.side_effect = itertools.cycle([ 380 mock.Mock(stdout=TPM_STATUS_OWNED), 381 mock.Mock(stdout=TPM_STATUS_NOT_READY), 382 ]) 383 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 384 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 385 tpm_verifier.verify(mock_host) 386 self.assertEqual('TPM is not ready', str(ctx.exception)) 387 388 @mock.patch.object(retry.logging, 'warning') 389 @mock.patch.object(retry.time, 'time') 390 @mock.patch.object(retry.time, 'sleep') 391 def test_jetstream_tpm_missing(self, mock_sleep, mock_time, mock_logging): 392 mock_time.side_effect = itertools.count(0, 20) 393 mock_host = mock.Mock() 394 mock_host.run.side_effect = error.AutoservRunError('test', None) 395 tpm_verifier = cros_repair.JetstreamTpmVerifier('test', []) 396 with self.assertRaises(hosts.AutoservVerifyError) as ctx: 397 tpm_verifier.verify(mock_host) 398 self.assertEqual('Could not determine TPM status', str(ctx.exception)) 399 400 401if __name__ == '__main__': 402 unittest.main() 403