xref: /aosp_15_r20/external/autotest/server/hosts/cros_repair_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/python3
2# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import itertools
7import unittest
8from unittest import mock
9
10import common
11from autotest_lib.client.common_lib import error
12from autotest_lib.client.common_lib import hosts
13from autotest_lib.client.common_lib.cros import retry
14from autotest_lib.server.hosts import cros_firmware
15from autotest_lib.server.hosts import cros_repair
16from autotest_lib.server.hosts import repair_utils
17
18
19CROS_VERIFY_DAG = (
20        (repair_utils.PingVerifier, 'ping', ()),
21        (repair_utils.SshVerifier, 'ssh', ('ping', )),
22        (cros_repair.ServoUSBDriveVerifier, 'usb_drive', ()),
23        (cros_repair.DevDefaultBootVerifier, 'dev_default_boot', ('ssh', )),
24        (cros_repair.DevModeVerifier, 'devmode', ('ssh', )),
25        (cros_repair.EnrollmentStateVerifier, 'enrollment_state', ('ssh', )),
26        (cros_repair.HWIDVerifier, 'hwid', ('ssh', )),
27        (cros_repair.ACPowerVerifier, 'power', ('ssh', )),
28        (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh', )),
29        (cros_repair.WritableVerifier, 'writable', ('ssh', )),
30        (cros_repair.TPMStatusVerifier, 'tpm', ('ssh', )),
31        (cros_repair.UpdateSuccessVerifier, 'good_provision', ('ssh', )),
32        (cros_repair.FirmwareTpmVerifier, 'faft_tpm', ('ssh', )),
33        (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh', )),
34        (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh', )),
35        (cros_repair.PythonVerifier, 'python', ('ssh', )),
36        (repair_utils.LegacyHostVerifier, 'cros', ('ssh', )),
37        (cros_repair.ProvisioningLabelsVerifier, 'provisioning_labels',
38         ('ssh', )),
39        (cros_repair.StopStartUIVerifier, 'stop_start_ui', ('ssh', )),
40        (cros_repair.DUTStorageVerifier, 'storage', ('ssh', )),
41        (cros_repair.AuditBattery, 'audit_battery', ()),
42        (cros_repair.GscToolPresentVerifier, 'dut_gsctool', ('ssh', )),
43        (cros_repair.ServoKeyboardMapVerifier, 'dut_servo_keyboard',
44         ('ssh', )),
45        (cros_repair.ServoMacAddressVerifier, 'dut_servo_macaddr', ('ssh', )),
46)
47
48CROS_REPAIR_ACTIONS = (
49        (repair_utils.RPMCycleRepair, 'rpm', (), (
50                'ping',
51                'ssh',
52                'power',
53        )),
54        (cros_repair.ServoResetRepair, 'servoreset', (), (
55                'ping',
56                'ssh',
57                'stop_start_ui',
58                'power',
59        )),
60        (cros_repair.ServoCr50RebootRepair, 'cr50_reset', (),
61         ('ping', 'ssh', 'stop_start_ui', 'power')),
62        (cros_repair.ServoSysRqRepair, 'sysrq', (), (
63                'ping',
64                'ssh',
65        )),
66        (cros_repair.ProvisioningLabelsRepair, 'provisioning_labels_repair',
67         ('ssh', ), ('provisioning_labels', )),
68        (cros_firmware.FaftFirmwareRepair, 'faft_firmware_repair', (),
69         ('ping', 'ssh', 'fwstatus', 'good_provision')),
70        (cros_repair.DevDefaultBootRepair, 'set_default_boot', ('ssh', ),
71         ('dev_default_boot', )),
72        (cros_repair.CrosRebootRepair, 'reboot', ('ssh', ), (
73                'devmode',
74                'writable',
75        )),
76        (cros_repair.EnrollmentCleanupRepair, 'cleanup_enrollment', ('ssh', ),
77         ('enrollment_state', )),
78        (cros_firmware.GeneralFirmwareRepair, 'general_firmware',
79         ('usb_drive', ), (
80                 'ping',
81                 'ssh',
82         )),
83        (cros_repair.RecoverACPowerRepair, 'ac_recover', (), ('ping',
84                                                              'power')),
85        (cros_repair.ProvisionRepair, 'provision',
86         ('ping', 'ssh', 'writable', 'tpm', 'good_provision',
87          'ext4'), ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros',
88                    'dev_default_boot', 'stop_start_ui', 'dut_gsctool')),
89        (cros_repair.PowerWashRepair, 'powerwash', ('ping', 'ssh', 'writable'),
90         ('tpm', 'good_provision', 'ext4', 'power', 'rwfw', 'fwstatus',
91          'python', 'hwid', 'cros', 'dev_default_boot', 'stop_start_ui',
92          'dut_gsctool')),
93        (cros_repair.ServoInstallRepair, 'usb', ('usb_drive', ),
94         ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 'ext4', 'power',
95          'rwfw', 'fwstatus', 'python', 'hwid', 'cros', 'dev_default_boot',
96          'stop_start_ui', 'dut_gsctool', 'faft_tpm')),
97        (cros_repair.ServoResetAfterUSBRepair, 'servo_reset_after_usb',
98         ('usb_drive', ), ('ping', 'ssh')),
99        (cros_repair.RecoverFwAfterUSBRepair, 'recover_fw_after_usb',
100         ('usb_drive', ), ('ping', 'ssh')),
101)
102
103MOBLAB_VERIFY_DAG = (
104    (repair_utils.SshVerifier, 'ssh', ()),
105    (cros_repair.ACPowerVerifier, 'power', ('ssh',)),
106    (cros_repair.PythonVerifier, 'python', ('ssh',)),
107    (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
108)
109
110MOBLAB_REPAIR_ACTIONS = (
111    (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
112    (cros_repair.ProvisionRepair,
113     'provision', ('ssh',), ('power', 'python', 'cros',)),
114)
115
116JETSTREAM_VERIFY_DAG = (
117        (repair_utils.PingVerifier, 'ping', ()),
118        (repair_utils.SshVerifier, 'ssh', ('ping', )),
119        (cros_repair.ServoUSBDriveVerifier, 'usb_drive', ()),
120        (cros_repair.DevDefaultBootVerifier, 'dev_default_boot', ('ssh', )),
121        (cros_repair.DevModeVerifier, 'devmode', ('ssh', )),
122        (cros_repair.EnrollmentStateVerifier, 'enrollment_state', ('ssh', )),
123        (cros_repair.HWIDVerifier, 'hwid', ('ssh', )),
124        (cros_repair.ACPowerVerifier, 'power', ('ssh', )),
125        (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh', )),
126        (cros_repair.WritableVerifier, 'writable', ('ssh', )),
127        (cros_repair.TPMStatusVerifier, 'tpm', ('ssh', )),
128        (cros_repair.UpdateSuccessVerifier, 'good_provision', ('ssh', )),
129        (cros_repair.FirmwareTpmVerifier, 'faft_tpm', ('ssh', )),
130        (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh', )),
131        (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh', )),
132        (cros_repair.PythonVerifier, 'python', ('ssh', )),
133        (repair_utils.LegacyHostVerifier, 'cros', ('ssh', )),
134        (cros_repair.ProvisioningLabelsVerifier, 'provisioning_labels',
135         ('ssh', )),
136        (cros_repair.JetstreamTpmVerifier, 'jetstream_tpm', ('ssh', )),
137        (cros_repair.JetstreamAttestationVerifier, 'jetstream_attestation',
138         ('ssh', )),
139        (cros_repair.JetstreamServicesVerifier, 'jetstream_services',
140         ('ssh', )),
141)
142
143JETSTREAM_REPAIR_ACTIONS = (
144        (repair_utils.RPMCycleRepair, 'rpm', (), (
145                'ping',
146                'ssh',
147                'power',
148        )),
149        (cros_repair.ServoResetRepair, 'servoreset', (), (
150                'ping',
151                'ssh',
152        )),
153        (cros_repair.ServoCr50RebootRepair, 'cr50_reset', (), (
154                'ping',
155                'ssh',
156        )),
157        (cros_repair.ServoSysRqRepair, 'sysrq', (), (
158                'ping',
159                'ssh',
160        )),
161        (cros_repair.ProvisioningLabelsRepair, 'provisioning_labels_repair',
162         ('ssh', ), ('provisioning_labels', )),
163        (cros_firmware.FaftFirmwareRepair, 'faft_firmware_repair', (),
164         ('ping', 'ssh', 'fwstatus', 'good_provision')),
165        (cros_repair.DevDefaultBootRepair, 'set_default_boot', ('ssh', ),
166         ('dev_default_boot', )),
167        (cros_repair.CrosRebootRepair, 'reboot', ('ssh', ), (
168                'devmode',
169                'writable',
170        )),
171        (cros_repair.EnrollmentCleanupRepair, 'cleanup_enrollment', ('ssh', ),
172         ('enrollment_state', )),
173        (cros_repair.JetstreamTpmRepair, 'jetstream_tpm_repair',
174         ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 'ext4'),
175         ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros',
176          'dev_default_boot', 'jetstream_tpm', 'jetstream_attestation')),
177        (cros_repair.JetstreamServiceRepair, 'jetstream_service_repair',
178         ('ping', 'ssh', 'writable', 'tpm', 'good_provision', 'ext4',
179          'jetstream_tpm', 'jetstream_attestation'),
180         ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros',
181          'dev_default_boot', 'jetstream_tpm', 'jetstream_attestation',
182          'jetstream_services')),
183        (cros_repair.ProvisionRepair, 'provision',
184         ('ping', 'ssh', 'writable', 'tpm', 'good_provision',
185          'ext4'), ('power', 'rwfw', 'fwstatus', 'python', 'hwid', 'cros',
186                    'dev_default_boot', 'jetstream_tpm',
187                    'jetstream_attestation', 'jetstream_services')),
188        (cros_repair.PowerWashRepair, 'powerwash', ('ping', 'ssh', 'writable'),
189         ('tpm', 'good_provision', 'ext4', 'power', 'rwfw', 'fwstatus',
190          'python', 'hwid', 'cros', 'dev_default_boot', 'jetstream_tpm',
191          'jetstream_attestation', 'jetstream_services')),
192        (cros_repair.ServoInstallRepair, 'usb', ('usb_drive', ), (
193                'ping',
194                'ssh',
195                'writable',
196                'tpm',
197                'good_provision',
198                'ext4',
199                'power',
200                'rwfw',
201                'fwstatus',
202                'python',
203                'hwid',
204                'cros',
205                'dev_default_boot',
206                'jetstream_tpm',
207                'jetstream_attestation',
208                'jetstream_services',
209                'faft_tpm',
210        )),
211)
212
213TPM_STATUS_OWNED = """
214Message Reply: [tpm_manager.GetTpmNonsensitiveStatusReply] {
215  status: STATUS_SUCCESS
216  is_enabled: true
217  is_owned: true
218  is_owner_password_present: true
219  has_reset_lock_permissions: true
220  is_srk_default_auth: true
221}
222"""
223
224TPM_STATUS_NOT_OWNED = """
225Message Reply: [tpm_manager.GetTpmNonsensitiveStatusReply] {
226  status: STATUS_SUCCESS
227  is_enabled: true
228  is_owned: false
229  is_owner_password_present: false
230  has_reset_lock_permissions: false
231  is_srk_default_auth: true
232}
233"""
234
235TPM_STATUS_CANNOT_LOAD_SRK = """
236Message Reply: [tpm_manager.GetTpmNonsensitiveStatusReply] {
237  status: STATUS_SUCCESS
238  is_enabled: true
239  is_owned: true
240  is_owner_password_present: false
241  has_reset_lock_permissions: false
242  is_srk_default_auth: false
243}
244"""
245
246TPM_STATUS_READY = """
247TPM Enabled: true
248TPM Owned: true
249TPM Being Owned: false
250TPM Ready: true
251TPM Password: 9eaee4da8b4c
252"""
253
254TPM_STATUS_NOT_READY = """
255TPM Enabled: true
256TPM Owned: false
257TPM Being Owned: true
258TPM Ready: false
259TPM Password:
260"""
261
262
263class CrosRepairUnittests(unittest.TestCase):
264    # pylint: disable=missing-docstring
265
266    maxDiff = None
267
268    def test_cros_repair(self):
269        verify_dag = cros_repair._cros_verify_dag()
270        self.assertTupleEqual(verify_dag, CROS_VERIFY_DAG)
271        self.check_verify_dag(verify_dag)
272        repair_actions = cros_repair._cros_repair_actions()
273        self.assertTupleEqual(repair_actions, CROS_REPAIR_ACTIONS)
274        self.check_repair_actions(verify_dag, repair_actions)
275
276    def test_moblab_repair(self):
277        verify_dag = cros_repair._moblab_verify_dag()
278        self.assertTupleEqual(verify_dag, MOBLAB_VERIFY_DAG)
279        self.check_verify_dag(verify_dag)
280        repair_actions = cros_repair._moblab_repair_actions()
281        self.assertTupleEqual(repair_actions, MOBLAB_REPAIR_ACTIONS)
282        self.check_repair_actions(verify_dag, repair_actions)
283
284    def test_jetstream_repair(self):
285        verify_dag = cros_repair._jetstream_verify_dag()
286        self.assertTupleEqual(verify_dag, JETSTREAM_VERIFY_DAG)
287        self.check_verify_dag(verify_dag)
288        repair_actions = cros_repair._jetstream_repair_actions()
289        self.assertTupleEqual(repair_actions, JETSTREAM_REPAIR_ACTIONS)
290        self.check_repair_actions(verify_dag, repair_actions)
291
292    def check_verify_dag(self, verify_dag):
293        """Checks that dependency labels are defined."""
294        labels = [n[1] for n in verify_dag]
295        for node in verify_dag:
296            for dep in node[2]:
297                self.assertIn(dep, labels)
298
299    def check_repair_actions(self, verify_dag, repair_actions):
300        """Checks that dependency and trigger labels are defined."""
301        verify_labels = [n[1] for n in verify_dag]
302        for action in repair_actions:
303            deps = action[2]
304            triggers = action[3]
305            for label in deps + triggers:
306                self.assertIn(label, verify_labels)
307
308    def test_get_tpm_status_owned(self):
309        mock_host = mock.Mock()
310        mock_host.run.return_value.stdout = TPM_STATUS_OWNED
311        status = cros_repair.TpmStatus(mock_host)
312        self.assertTrue(status.tpm_enabled)
313        self.assertTrue(status.tpm_owned)
314        self.assertTrue(status.tpm_can_load_srk)
315        self.assertTrue(status.tpm_can_load_srk_pubkey)
316
317    def test_get_tpm_status_not_owned(self):
318        mock_host = mock.Mock()
319        mock_host.run.return_value.stdout = TPM_STATUS_NOT_OWNED
320        status = cros_repair.TpmStatus(mock_host)
321        self.assertTrue(status.tpm_enabled)
322        self.assertFalse(status.tpm_owned)
323        self.assertFalse(status.tpm_can_load_srk)
324        self.assertFalse(status.tpm_can_load_srk_pubkey)
325
326    @mock.patch.object(cros_repair, '_is_virtual_machine')
327    def test_tpm_status_verifier_owned(self, mock_is_virt):
328        mock_is_virt.return_value = False
329        mock_host = mock.Mock()
330        mock_host.run.return_value.stdout = TPM_STATUS_OWNED
331        tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
332        tpm_verifier.verify(mock_host)
333
334    @mock.patch.object(cros_repair, '_is_virtual_machine')
335    def test_tpm_status_verifier_not_owned(self, mock_is_virt):
336        mock_is_virt.return_value = False
337        mock_host = mock.Mock()
338        mock_host.run.return_value.stdout = TPM_STATUS_NOT_OWNED
339        tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
340        tpm_verifier.verify(mock_host)
341
342    @mock.patch.object(cros_repair, '_is_virtual_machine')
343    def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt):
344        mock_is_virt.return_value = False
345        mock_host = mock.Mock()
346        mock_host.run.return_value.stdout = TPM_STATUS_CANNOT_LOAD_SRK
347        tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
348        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
349            tpm_verifier.verify(mock_host)
350        self.assertEqual('Cannot load the TPM SRK', str(ctx.exception))
351
352    def test_jetstream_tpm_owned(self):
353        mock_host = mock.Mock()
354        mock_host.run.side_effect = [
355                mock.Mock(stdout=TPM_STATUS_OWNED),
356                mock.Mock(stdout=TPM_STATUS_READY),
357        ]
358        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
359        tpm_verifier.verify(mock_host)
360
361    @mock.patch.object(retry.logging, 'warning')
362    @mock.patch.object(retry.time, 'time')
363    @mock.patch.object(retry.time, 'sleep')
364    def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging):
365        mock_time.side_effect = itertools.count(0, 20)
366        mock_host = mock.Mock()
367        mock_host.run.return_value.stdout = TPM_STATUS_NOT_OWNED
368        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
369        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
370            tpm_verifier.verify(mock_host)
371        self.assertEqual('TPM is not owned', str(ctx.exception))
372
373    @mock.patch.object(retry.logging, 'warning')
374    @mock.patch.object(retry.time, 'time')
375    @mock.patch.object(retry.time, 'sleep')
376    def test_jetstream_tpm_not_ready(self, mock_sleep, mock_time, mock_logging):
377        mock_time.side_effect = itertools.count(0, 20)
378        mock_host = mock.Mock()
379        mock_host.run.side_effect = itertools.cycle([
380                mock.Mock(stdout=TPM_STATUS_OWNED),
381                mock.Mock(stdout=TPM_STATUS_NOT_READY),
382        ])
383        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
384        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
385            tpm_verifier.verify(mock_host)
386        self.assertEqual('TPM is not ready', str(ctx.exception))
387
388    @mock.patch.object(retry.logging, 'warning')
389    @mock.patch.object(retry.time, 'time')
390    @mock.patch.object(retry.time, 'sleep')
391    def test_jetstream_tpm_missing(self, mock_sleep, mock_time, mock_logging):
392        mock_time.side_effect = itertools.count(0, 20)
393        mock_host = mock.Mock()
394        mock_host.run.side_effect = error.AutoservRunError('test', None)
395        tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
396        with self.assertRaises(hosts.AutoservVerifyError) as ctx:
397            tpm_verifier.verify(mock_host)
398        self.assertEqual('Could not determine TPM status', str(ctx.exception))
399
400
401if __name__ == '__main__':
402    unittest.main()
403