1# Lint as: python2, python3 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6# AU tests use ToT client code, but ToT -3 client version. 7try: 8 from gi.repository import GObject 9except ImportError: 10 import gobject as GObject 11import logging 12import time 13 14from autotest_lib.client.bin import test 15from autotest_lib.client.common_lib import error 16from autotest_lib.client.cros.mainloop import ExceptionForward 17from autotest_lib.client.cros.mainloop import GenericTesterMainLoop 18from autotest_lib.client.cros.networking import shill_proxy 19 20DEFAULT_TEST_TIMEOUT_S = 600 21 22 23class DisableTester(GenericTesterMainLoop): 24 """Base class containing main test logic.""" 25 def __init__(self, *args, **kwargs): 26 super(DisableTester, self).__init__(*args, **kwargs) 27 28 @ExceptionForward 29 def perform_one_test(self): 30 """Called by GenericMainTesterMainLoop to execute the test.""" 31 self._configure() 32 disable_delay_ms = ( 33 self.test_kwargs.get('delay_before_disable_ms', 0) + 34 self.test.iteration * 35 self.test_kwargs.get('disable_delay_per_iteration_ms', 0)) 36 GObject.timeout_add(disable_delay_ms, self._start_disable) 37 self._start_test() 38 39 @ExceptionForward 40 def _connect_success_handler(self, *ignored_args): 41 logging.info('connect succeeded') 42 self.requirement_completed('connect') 43 44 @ExceptionForward 45 def _connect_error_handler(self, e): 46 # We disabled while connecting; error is OK 47 logging.info('connect errored: %s', e) 48 self.requirement_completed('connect') 49 50 @ExceptionForward 51 def _start_disable(self): 52 logging.info('disabling') 53 self.disable_start = time.time() 54 self._enable(False) 55 56 @ExceptionForward 57 def _disable_success_handler(self): 58 disable_elapsed = time.time() - self.disable_start 59 self.requirement_completed('disable') 60 61 @ExceptionForward 62 def _get_status_success_handler(self, status): 63 logging.info('Got status') 64 self.requirement_completed('get_status', warn_if_already_completed=False) 65 if self.status_delay_ms: 66 GObject.timeout_add(self.status_delay_ms, self._start_get_status) 67 68 def after_main_loop(self): 69 """Called by GenericTesterMainLoop after the main loop has exited.""" 70 enabled = self._enabled() 71 logging.info('Modem enabled: %s', enabled) 72 73 74class ShillDisableTester(DisableTester): 75 """Tests that disable-while-connecting works at the shill level. 76 Expected control flow: 77 78 * self._configure() called; registers self._disable_property_changed 79 to be called when device is en/disabled 80 81 * Parent class sets a timer that calls self._enable(False) when it expires. 82 83 * _start_test calls _start_connect() which sends a connect request to 84 the device. 85 86 * we wait for the modem to power off, at which point 87 _disable_property_changed (registered above) will get called 88 89 * _disable_property_changed() completes the 'disable' requirement, 90 and we're done. 91 92 """ 93 def __init__(self, *args, **kwargs): 94 super(ShillDisableTester, self).__init__(*args, **kwargs) 95 96 def _disable_property_changed(self, property, value, *args, **kwargs): 97 self._disable_success_handler() 98 99 def _start_test(self): 100 # We would love to add requirements based on connect, but in many 101 # scenarios, there is no observable response to a cancelled 102 # connect: We issue a connect, it returns instantly to let us know 103 # that the connect has started, but then the disable takes effect 104 # and the connect fails. We don't get a state change because no 105 # state change has happened: the modem never got to a different 106 # state before we cancelled 107 self.remaining_requirements = set(['disable']) 108 self._start_connect() 109 110 def _configure(self): 111 self.cellular_device = \ 112 self.test.test_env.shill.find_cellular_device_object() 113 if self.cellular_device is None: 114 raise error.TestError("Could not find cellular device") 115 116 self.cellular_service = \ 117 self.test.test_env.shill.find_cellular_service_object() 118 119 self.test.test_env.bus.add_signal_receiver( 120 self.dispatch_property_changed, 121 signal_name='PropertyChanged', 122 dbus_interface=self.cellular_device.dbus_interface, 123 path=self.cellular_device.object_path) 124 125 @ExceptionForward 126 def _expect_einprogress_handler(self, e): 127 pass 128 129 def _enable(self, value): 130 self.property_changed_actions['Powered'] = self._disable_property_changed 131 132 if value: 133 self.cellular_device.Enable( 134 reply_handler=self.ignore_handler, 135 error_handler=self._expect_einprogress_handler) 136 else: 137 self.cellular_device.Disable( 138 reply_handler=self.ignore_handler, 139 error_handler=self._expect_einprogress_handler) 140 141 @ExceptionForward 142 def _start_connect(self): 143 logging.info('connecting') 144 145 def _log_connect_event(property, value, *ignored_args): 146 logging.info('%s property changed: %s', property, value) 147 148 self.property_changed_actions['Connected'] = _log_connect_event 149 150 # Contrary to documentation, Connect just returns when it has 151 # fired off the lower-level dbus messages. So a success means 152 # nothing to us. But a failure means it didn't even try. 153 self.cellular_service.Connect( 154 reply_handler=self.ignore_handler, 155 error_handler=self.build_error_handler('Connect')) 156 157 def _enabled(self): 158 return self.test.test_env.shill.get_dbus_property( 159 self.cellular_device, 160 shill_proxy.ShillProxy.DEVICE_PROPERTY_POWERED) 161 162 163class ModemDisableTester(DisableTester): 164 """Tests that disable-while-connecting works at the modem-manager level. 165 166 Expected control flow: 167 168 * _configure() is called. 169 170 * Parent class sets a timer that calls self._enable(False) when it 171 expires. 172 173 * _start_test calls _start_connect() which sends a connect request to 174 the device, also sets a timer that calls GetStatus on the modem. 175 176 * wait for all three (connect, disable, get_status) to complete. 177 178 """ 179 def __init__(self, *args, **kwargs): 180 super(ModemDisableTester, self).__init__(*args, **kwargs) 181 182 def _is_gobi(self): 183 return 'Gobi' in self.test.test_env.modem.path 184 185 def _start_test(self): 186 self.remaining_requirements = set(['connect', 'disable']) 187 188 # Only cromo/gobi-cromo-plugin maintain the invariant that GetStatus 189 # will always succeed, so we only check it if the modem is a Gobi. 190 if self._is_gobi(): 191 self.remaining_requirements.add('get_status') 192 self.status_delay_ms = self.test_kwargs.get('status_delay_ms', 200) 193 GObject.timeout_add(self.status_delay_ms, self._start_get_status) 194 195 self._start_connect() 196 197 def _configure(self): 198 self.simple_modem = self.test.test_env.modem.SimpleModem() 199 200 logging.info('Modem path: %s', self.test.test_env.modem.path) 201 202 if self._is_gobi(): 203 self._configure_gobi() 204 else: 205 self._configure_non_gobi() 206 207 service = self.test.test_env.shill.wait_for_cellular_service_object() 208 if not service: 209 raise error.TestError('Modem failed to register with the network after ' 210 're-enabling.') 211 212 def _configure_gobi(self): 213 gobi_modem = self.test.test_env.modem.GobiModem() 214 215 if 'async_connect_sleep_ms' in self.test_kwargs: 216 sleep_ms = self.test_kwargs.get('async_connect_sleep_ms', 0) 217 logging.info('Sleeping %d ms before connect', sleep_ms) 218 gobi_modem.InjectFault('AsyncConnectSleepMs', sleep_ms) 219 220 if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs: 221 logging.info('Injecting QMI failure') 222 gobi_modem.InjectFault('ConnectFailsWithErrorSendingQmiRequest', 1) 223 224 def _configure_non_gobi(self): 225 # Check to make sure no Gobi-specific arguments were specified. 226 if 'async_connect_sleep_ms' in self.test_kwargs: 227 raise error.TestError('async_connect_sleep_ms on non-Gobi modem') 228 if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs: 229 raise error.TestError( 230 'connect_fails_with_error_sending_qmi_request on non-Gobi modem') 231 232 @ExceptionForward 233 def _start_connect(self): 234 logging.info('connecting') 235 236 retval = self.simple_modem.Connect( 237 {}, 238 reply_handler=self._connect_success_handler, 239 error_handler=self._connect_error_handler) 240 logging.info('connect call made. retval = %s', retval) 241 242 243 @ExceptionForward 244 def _start_get_status(self): 245 # Keep on calling get_status to make sure it works at all times 246 self.simple_modem.GetStatus( 247 reply_handler=self._get_status_success_handler, 248 error_handler=self.build_error_handler('GetStatus')) 249 250 def _enabled(self): 251 return self.test.test_env.modem.GetModemProperties().get('Enabled', -1) 252 253 def _enable(self, value): 254 self.test.test_env.modem.Enable( 255 value, 256 reply_handler=self._disable_success_handler, 257 error_handler=self.build_error_handler('Enable')) 258 259 260class cellular_DisableWhileConnecting(test.test): 261 """Check that the modem can handle a disconnect while connecting.""" 262 version = 1 263 264 def run_once(self, test_env, **kwargs): 265 self.test_env = test_env 266 timeout_s = kwargs.get('timeout_s', DEFAULT_TEST_TIMEOUT_S) 267 gobject_main_loop = GObject.MainLoop() 268 269 with test_env: 270 logging.info('Shill-level test') 271 shill_level_test = ShillDisableTester(self, 272 gobject_main_loop, 273 timeout_s=timeout_s) 274 shill_level_test.run(**kwargs) 275 276 with test_env: 277 logging.info('Modem-level test') 278 modem_level_test = ModemDisableTester(self, 279 gobject_main_loop, 280 timeout_s=timeout_s) 281 modem_level_test.run(**kwargs) 282