1# Lint as: python2, python3
2# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import logging
7import time
8
9from autotest_lib.client.bin import utils
10from autotest_lib.client.common_lib import error
11from autotest_lib.server import test
12
13_MODEM_WAIT_DELAY = 120
14
15NO_MODEM_STATE_AVAILABLE = 'FAILED TO GET MODEM STATE'
16
17# TODO(harpreet / benchan): Modify the modem script to report modem health.
18# crbug.com/352351
19MM_MODEM_STATE_FAILED = '-1'
20MM_MODEM_STATE_UNKNOWN = '0'
21MM_MODEM_STATE_INITIALIZING = '1'
22MM_MODEM_STATE_LOCKED = '2'
23MM_MODEM_STATE_DISABLED = '3'
24MM_MODEM_STATE_DISABLING = '4'
25MM_MODEM_STATE_ENABLING = '5'
26MM_MODEM_STATE_ENABLED = '6'
27MM_MODEM_STATE_SEARCHING = '7'
28MM_MODEM_STATE_REGISTERED = '8'
29MM_MODEM_STATE_DISCONNECTING = '9'
30MM_MODEM_STATE_CONNECTING = '10'
31MM_MODEM_STATE_CONNECTED = '11'
32
33GOBI_MODEM_STATE_UNKNOWN = '0'
34GOBI_MODEM_STATE_DISABLED = '10'
35GOBI_MODEM_STATE_DISABLING = '20'
36GOBI_MODEM_STATE_ENABLING = '30'
37GOBI_MODEM_STATE_ENABLED = '40'
38GOBI_MODEM_STATE_SEARCHING = '50'
39GOBI_MODEM_STATE_REGISTERED = '60'
40GOBI_MODEM_STATE_DISCONNECTING = '70'
41GOBI_MODEM_STATE_CONNECTING = '80'
42GOBI_MODEM_STATE_CONNECTED = '90'
43
44ENABLED_MODEM_STATES = [
45    MM_MODEM_STATE_ENABLED,
46    GOBI_MODEM_STATE_ENABLED
47]
48
49MM_STABLE_MODEM_STATES = [
50    MM_MODEM_STATE_DISABLED,
51    MM_MODEM_STATE_REGISTERED,
52    MM_MODEM_STATE_CONNECTED,
53]
54
55GOBI_STABLE_MODEM_STATES = [
56    GOBI_MODEM_STATE_DISABLED,
57    GOBI_MODEM_STATE_REGISTERED,
58    GOBI_MODEM_STATE_CONNECTED
59]
60
61class cellular_StaleModemReboot(test.test):
62    """
63    Uses servo to cold reboot the device if modem is not available or is not in
64    testable state.
65
66    The test attempts to get modem status by running the 'modem status' command
67    on the DUT. If it is unsuccessful in getting the modem status or the modem
68    is in a bad state, it will try to reboot the DUT.
69
70    """
71
72    version = 1
73
74    def _modem_state_to_string(self, state, is_gobi):
75        """Takes the numerical modem state and returns associated state name.
76
77        @param state: The state of the modem on the device.
78        @param is_gobi: True if the device has a gobi modem.
79        @return MODEM_STATE_STRINGS as the actual name of the state associated
80                with the numeric value.
81
82        """
83        if not state:
84            return NO_MODEM_STATE_AVAILABLE
85
86        if is_gobi:
87            MODEM_STATE_STRINGS = [
88                'UNKNOWN',
89                'DISABLED',
90                'DISABLING',
91                'ENABLING',
92                'ENABLED',
93                'SEARCHING',
94                'REGISTERED',
95                'DISCONNECTING',
96                'CONNECTING',
97                'CONNECTED'
98            ]
99            return MODEM_STATE_STRINGS[int(state[:1])]
100
101        MODEM_STATE_STRINGS = [
102            'FAILED',
103            'UNKNOWN',
104            'INITIALIZING',
105            'LOCKED',
106            'DISABLED',
107            'DISABLING',
108            'ENABLING',
109            'ENABLED',
110            'SEARCHING',
111            'REGISTERED',
112            'DISCONNECTING',
113            'CONNECTING',
114            'CONNECTED'
115        ]
116        return MODEM_STATE_STRINGS[int(state) + 1]
117
118
119    def _format_modem_status(self, modem_status):
120        """Formats the modem status data and inserts it into a dictionary.
121
122        @param modem_status: Command line output of 'modem status'.
123        @return modem status dictionary
124
125        """
126
127        modem_state = ''
128        modem_status_dict = {}
129
130        if not modem_status:
131            return None
132
133        lines = modem_status.splitlines()
134
135        for item in lines:
136            columns = item.split(':')
137            columns = [x.strip() for x in columns]
138            if len(columns) > 1:
139                modem_status_dict[columns[0]] = columns[1]
140            else:
141                modem_status_dict[columns[0]] = ''
142
143        return modem_status_dict
144
145
146    def _get_modem_status(self):
147        """Gets the status of the modem by running 'modem status' command.
148
149        @return modem_status_dict: is the dictionary of all the lines retuned
150                by modem status command.
151
152        """
153        try:
154            modem_status = self._client.run('modem status').stdout.strip()
155            modem_status_dict = self._format_modem_status(modem_status)
156            return modem_status_dict
157        except error.AutoservRunError as e:
158            logging.debug("AutoservRunError is: %s", e)
159            return None
160
161
162    def _get_modem_state(self):
163        modem_status_dict = self._get_modem_status()
164
165        if not modem_status_dict:
166            return None
167
168        return modem_status_dict.get('State')
169
170
171    def _cold_reset_dut(self, boot_id):
172        self._client.power_off_via_servo()
173        self._client.power_on_via_servo()
174        time.sleep(self._servo.BOOT_DELAY)
175        self._client.wait_for_restart(old_boot_id=boot_id)
176        self._wait_for_stable_modem_state()
177
178
179    def _wait_for_stable_modem_state(self):
180        """
181        Wait for a maximum of _MODEM_WAIT_DELAY seconds for the modem to get
182        into stable state. Also, because we do not want the test to terminate
183        in case there is an exception, we are catching and logging the exception
184        so the test can continue with rebooting the device again as needed.
185
186        """
187        try:
188            utils.poll_for_condition(
189                  lambda: self._get_modem_state() in self.STABLE_MODEM_STATES,
190                  exception=utils.TimeoutError('Modem not in stable state '
191                                               'after %s seconds.' %
192                                               _MODEM_WAIT_DELAY),
193                  timeout=_MODEM_WAIT_DELAY,
194                  sleep_interval=5)
195        except utils.TimeoutError as e:
196            logging.debug("Stable modem state TimeoutError is: %s", e)
197
198
199    def run_once(self, host, tries=2, expect_auto_registration=True):
200        """
201        Runs the test.
202
203        @param host: A host object representing the DUT.
204        @param tries: Maximum number of times test will try to reboot the DUT.
205                Default number of tries is 2, which is set in the control file.
206        @param expect_auto_registration: To be used with an exceptional modem
207                that does not auto-register by passing False from modem specific
208                control file.
209        @raise error.TestFail if modem cannot be brought to a testable stated.
210
211        """
212
213        self._client = host
214        self._servo = host.servo
215
216        if not self._servo:
217            logging.info('Host %s does not have a servo.', host.hostname)
218            return
219
220        self.STABLE_MODEM_STATES = MM_STABLE_MODEM_STATES
221        gobi = False
222        if 'gobi' in self._client.run('modem status').stdout.strip().lower():
223            self.STABLE_MODEM_STATES = GOBI_STABLE_MODEM_STATES
224            gobi = True
225
226        if not expect_auto_registration:
227            self.STABLE_MODEM_STATES.extend(ENABLED_MODEM_STATES)
228
229        original_modem_state = self._get_modem_state()
230
231        logging.info('Modem state before reboot on host %s: %s',
232                     host.hostname,
233                     self._modem_state_to_string(original_modem_state, gobi))
234
235        boot_id = self._client.get_boot_id()
236
237        num_tries = 0
238
239        while True:
240            self._cold_reset_dut(boot_id)
241            new_modem_state = self._get_modem_state()
242            if new_modem_state in self.STABLE_MODEM_STATES:
243                logging.info('Modem is in testable state: %s',
244                             self._modem_state_to_string(new_modem_state, gobi))
245                break
246            if new_modem_state == MM_MODEM_STATE_LOCKED:
247                raise error.TestFail('Modem in locked state.')
248            if num_tries == tries:
249                logging.info('Modem still in bad state after %s reboot tries '
250                             'on host %s. Modem state: %s ',
251                             tries+1, host.hostname,
252                             self._modem_state_to_string(new_modem_state, gobi))
253                raise error.TestFail('Modem is not in testable state')
254            num_tries += 1
255