1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6# AU tests use ToT client code, but ToT -3 client version.
7try:
8    from gi.repository import GObject
9except ImportError:
10    import gobject as GObject
11import logging
12import time
13
14from autotest_lib.client.bin import test
15from autotest_lib.client.common_lib import error
16from autotest_lib.client.cros.mainloop import ExceptionForward
17from autotest_lib.client.cros.mainloop import GenericTesterMainLoop
18from autotest_lib.client.cros.networking import shill_proxy
19
20DEFAULT_TEST_TIMEOUT_S = 600
21
22
23class DisableTester(GenericTesterMainLoop):
24  """Base class containing main test logic."""
25  def __init__(self, *args, **kwargs):
26    super(DisableTester, self).__init__(*args, **kwargs)
27
28  @ExceptionForward
29  def perform_one_test(self):
30    """Called by GenericMainTesterMainLoop to execute the test."""
31    self._configure()
32    disable_delay_ms = (
33        self.test_kwargs.get('delay_before_disable_ms', 0) +
34        self.test.iteration *
35        self.test_kwargs.get('disable_delay_per_iteration_ms', 0))
36    GObject.timeout_add(disable_delay_ms, self._start_disable)
37    self._start_test()
38
39  @ExceptionForward
40  def _connect_success_handler(self, *ignored_args):
41    logging.info('connect succeeded')
42    self.requirement_completed('connect')
43
44  @ExceptionForward
45  def _connect_error_handler(self, e):
46    # We disabled while connecting; error is OK
47    logging.info('connect errored: %s', e)
48    self.requirement_completed('connect')
49
50  @ExceptionForward
51  def _start_disable(self):
52    logging.info('disabling')
53    self.disable_start = time.time()
54    self._enable(False)
55
56  @ExceptionForward
57  def _disable_success_handler(self):
58    disable_elapsed = time.time() - self.disable_start
59    self.requirement_completed('disable')
60
61  @ExceptionForward
62  def _get_status_success_handler(self, status):
63    logging.info('Got status')
64    self.requirement_completed('get_status', warn_if_already_completed=False)
65    if self.status_delay_ms:
66      GObject.timeout_add(self.status_delay_ms, self._start_get_status)
67
68  def after_main_loop(self):
69    """Called by GenericTesterMainLoop after the main loop has exited."""
70    enabled = self._enabled()
71    logging.info('Modem enabled: %s', enabled)
72
73
74class ShillDisableTester(DisableTester):
75  """Tests that disable-while-connecting works at the shill level.
76  Expected control flow:
77
78  * self._configure() called; registers self._disable_property_changed
79    to be called when device is en/disabled
80
81  * Parent class sets a timer that calls self._enable(False) when it expires.
82
83  * _start_test calls _start_connect() which sends a connect request to
84    the device.
85
86  * we wait for the modem to power off, at which point
87    _disable_property_changed (registered above) will get called
88
89  * _disable_property_changed() completes the 'disable' requirement,
90    and we're done.
91
92  """
93  def __init__(self, *args, **kwargs):
94    super(ShillDisableTester, self).__init__(*args, **kwargs)
95
96  def _disable_property_changed(self, property, value, *args, **kwargs):
97    self._disable_success_handler()
98
99  def _start_test(self):
100    # We would love to add requirements based on connect, but in many
101    # scenarios, there is no observable response to a cancelled
102    # connect: We issue a connect, it returns instantly to let us know
103    # that the connect has started, but then the disable takes effect
104    # and the connect fails.  We don't get a state change because no
105    # state change has happened: the modem never got to a different
106    # state before we cancelled
107    self.remaining_requirements = set(['disable'])
108    self._start_connect()
109
110  def _configure(self):
111    self.cellular_device = \
112        self.test.test_env.shill.find_cellular_device_object()
113    if self.cellular_device is None:
114      raise error.TestError("Could not find cellular device")
115
116    self.cellular_service = \
117        self.test.test_env.shill.find_cellular_service_object()
118
119    self.test.test_env.bus.add_signal_receiver(
120            self.dispatch_property_changed,
121            signal_name='PropertyChanged',
122            dbus_interface=self.cellular_device.dbus_interface,
123            path=self.cellular_device.object_path)
124
125  @ExceptionForward
126  def _expect_einprogress_handler(self, e):
127    pass
128
129  def _enable(self, value):
130    self.property_changed_actions['Powered'] = self._disable_property_changed
131
132    if value:
133      self.cellular_device.Enable(
134          reply_handler=self.ignore_handler,
135          error_handler=self._expect_einprogress_handler)
136    else:
137      self.cellular_device.Disable(
138          reply_handler=self.ignore_handler,
139          error_handler=self._expect_einprogress_handler)
140
141  @ExceptionForward
142  def _start_connect(self):
143    logging.info('connecting')
144
145    def _log_connect_event(property, value, *ignored_args):
146      logging.info('%s property changed: %s', property, value)
147
148    self.property_changed_actions['Connected'] = _log_connect_event
149
150    # Contrary to documentation, Connect just returns when it has
151    # fired off the lower-level dbus messages.  So a success means
152    # nothing to us.  But a failure means it didn't even try.
153    self.cellular_service.Connect(
154        reply_handler=self.ignore_handler,
155        error_handler=self.build_error_handler('Connect'))
156
157  def _enabled(self):
158    return self.test.test_env.shill.get_dbus_property(
159            self.cellular_device,
160            shill_proxy.ShillProxy.DEVICE_PROPERTY_POWERED)
161
162
163class ModemDisableTester(DisableTester):
164  """Tests that disable-while-connecting works at the modem-manager level.
165
166  Expected control flow:
167
168  * _configure() is called.
169
170  * Parent class sets a timer that calls self._enable(False) when it
171    expires.
172
173  * _start_test calls _start_connect() which sends a connect request to
174    the device, also sets a timer that calls GetStatus on the modem.
175
176  * wait for all three (connect, disable, get_status) to complete.
177
178  """
179  def __init__(self, *args, **kwargs):
180    super(ModemDisableTester, self).__init__(*args, **kwargs)
181
182  def _is_gobi(self):
183    return 'Gobi' in self.test.test_env.modem.path
184
185  def _start_test(self):
186    self.remaining_requirements = set(['connect', 'disable'])
187
188    # Only cromo/gobi-cromo-plugin maintain the invariant that GetStatus
189    # will always succeed, so we only check it if the modem is a Gobi.
190    if self._is_gobi():
191      self.remaining_requirements.add('get_status')
192      self.status_delay_ms = self.test_kwargs.get('status_delay_ms', 200)
193      GObject.timeout_add(self.status_delay_ms, self._start_get_status)
194
195    self._start_connect()
196
197  def _configure(self):
198    self.simple_modem = self.test.test_env.modem.SimpleModem()
199
200    logging.info('Modem path: %s', self.test.test_env.modem.path)
201
202    if self._is_gobi():
203      self._configure_gobi()
204    else:
205      self._configure_non_gobi()
206
207    service = self.test.test_env.shill.wait_for_cellular_service_object()
208    if not service:
209      raise error.TestError('Modem failed to register with the network after '
210                            're-enabling.')
211
212  def _configure_gobi(self):
213    gobi_modem = self.test.test_env.modem.GobiModem()
214
215    if 'async_connect_sleep_ms' in self.test_kwargs:
216      sleep_ms = self.test_kwargs.get('async_connect_sleep_ms', 0)
217      logging.info('Sleeping %d ms before connect', sleep_ms)
218      gobi_modem.InjectFault('AsyncConnectSleepMs', sleep_ms)
219
220    if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs:
221      logging.info('Injecting QMI failure')
222      gobi_modem.InjectFault('ConnectFailsWithErrorSendingQmiRequest', 1)
223
224  def _configure_non_gobi(self):
225    # Check to make sure no Gobi-specific arguments were specified.
226    if 'async_connect_sleep_ms' in self.test_kwargs:
227      raise error.TestError('async_connect_sleep_ms on non-Gobi modem')
228    if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs:
229      raise error.TestError(
230          'connect_fails_with_error_sending_qmi_request on non-Gobi modem')
231
232  @ExceptionForward
233  def _start_connect(self):
234    logging.info('connecting')
235
236    retval = self.simple_modem.Connect(
237        {},
238        reply_handler=self._connect_success_handler,
239        error_handler=self._connect_error_handler)
240    logging.info('connect call made.  retval = %s', retval)
241
242
243  @ExceptionForward
244  def _start_get_status(self):
245    # Keep on calling get_status to make sure it works at all times
246    self.simple_modem.GetStatus(
247        reply_handler=self._get_status_success_handler,
248        error_handler=self.build_error_handler('GetStatus'))
249
250  def _enabled(self):
251    return self.test.test_env.modem.GetModemProperties().get('Enabled', -1)
252
253  def _enable(self, value):
254    self.test.test_env.modem.Enable(
255        value,
256        reply_handler=self._disable_success_handler,
257        error_handler=self.build_error_handler('Enable'))
258
259
260class cellular_DisableWhileConnecting(test.test):
261  """Check that the modem can handle a disconnect while connecting."""
262  version = 1
263
264  def run_once(self, test_env, **kwargs):
265    self.test_env = test_env
266    timeout_s = kwargs.get('timeout_s', DEFAULT_TEST_TIMEOUT_S)
267    gobject_main_loop = GObject.MainLoop()
268
269    with test_env:
270      logging.info('Shill-level test')
271      shill_level_test = ShillDisableTester(self,
272                                            gobject_main_loop,
273                                            timeout_s=timeout_s)
274      shill_level_test.run(**kwargs)
275
276    with test_env:
277      logging.info('Modem-level test')
278      modem_level_test = ModemDisableTester(self,
279                                            gobject_main_loop,
280                                            timeout_s=timeout_s)
281      modem_level_test.run(**kwargs)
282