xref: /aosp_15_r20/external/autotest/client/cros/cellular/test_environment.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import contextlib
7import dbus
8import logging
9import sys
10import time
11import traceback
12
13import common
14from autotest_lib.client.bin import local_host
15from autotest_lib.client.bin import utils
16from autotest_lib.client.common_lib import error
17from autotest_lib.client.common_lib.cros import crash_detector
18from autotest_lib.client.cros import upstart
19from autotest_lib.client.cros.cellular import mm
20from autotest_lib.client.cros.cellular import mm1_constants
21from autotest_lib.client.cros.networking import cellular_proxy
22from autotest_lib.client.cros.networking import mm1_proxy
23from autotest_lib.client.cros.networking import shill_context
24from autotest_lib.client.cros.networking import shill_proxy
25
26
27class CellularTestEnvironment(object):
28    """Setup and verify cellular test environment.
29
30    This context manager configures the following:
31        - Shuts down other devices except cellular.
32        - Shill and MM logging is enabled appropriately for cellular.
33        - Initializes members that tests should use to access test environment
34          (eg. |shill|, |modem_manager|, |modem|).
35        - modemfwd is stopped to prevent the modem from rebooting underneath
36          us.
37
38    Then it verifies the following is valid:
39        - The SIM is inserted and valid.
40        - There is one and only one modem in the device.
41        - The modem is registered to the network.
42        - There is a cellular service in shill and it's not connected.
43
44    Don't use this base class directly, use the appropriate subclass.
45
46    Setup for over-the-air tests:
47        with CellularOTATestEnvironment() as test_env:
48            # Test body
49
50    Setup for pseudomodem tests:
51        with CellularPseudoMMTestEnvironment(
52                pseudomm_args=({'family': '3GPP'})) as test_env:
53            # Test body
54
55    """
56
57    def __init__(self,
58                 shutdown_other_devices=True,
59                 modem_pattern='',
60                 skip_modem_reset=False,
61                 is_esim_test=False,
62                 enable_temp_containments=True):
63        """
64        @param shutdown_other_devices: If True, shutdown all devices except
65                cellular.
66        @param modem_pattern: Search string used when looking for the modem.
67        @param enable_temp_containments: Enable temporary containments to avoid
68                failures on tests with known problems.
69
70        """
71        # Tests should use this main loop instead of creating their own.
72        self.mainloop = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
73        self.bus = dbus.SystemBus(mainloop=self.mainloop)
74
75        self.shill = None
76        self.modem_manager = None
77        self.modem = None
78        self.modem_path = None
79
80        self._modem_pattern = modem_pattern
81        self._skip_modem_reset = skip_modem_reset
82        self._is_esim_test = is_esim_test
83        self._enable_temp_containments = enable_temp_containments
84        self._system_service_order = ''
85        self._test_service_order = 'cellular,ethernet'
86
87        self._nested = None
88        self._context_managers = []
89        self.detect_crash = crash_detector.CrashDetector(
90                local_host.LocalHost())
91        self.detect_crash.remove_crash_files()
92        if shutdown_other_devices:
93            self._context_managers.append(
94                    shill_context.AllowedTechnologiesContext([
95                            shill_proxy.ShillProxy.TECHNOLOGY_CELLULAR,
96                            shill_proxy.ShillProxy.TECHNOLOGY_ETHERNET
97                    ]))
98
99    @contextlib.contextmanager
100    def _disable_shill_autoconnect(self):
101        self._enable_shill_cellular_autoconnect(False)
102        yield
103        self._enable_shill_cellular_autoconnect(True)
104
105    def __enter__(self):
106        try:
107            # Wait for system daemons to stabilize before beginning the test.
108            # Modemfwd, Chrome, Shill and Hermes might be active before the test
109            # begins, and interrupting them abruptly during test setup might
110            # lead to flaky tests. The modem might also appear/disappear
111            # multiple times during this period. Ideally, we would wait for a
112            # green signal from these daemons before performing test setup.
113            with open('/proc/uptime') as uptime_file:
114                uptime = float(uptime_file.readline().split()[0])
115            if uptime < 60:
116                logging.info(
117                        "Waiting %.1f seconds to reach uptime of 1 minute before "
118                        "starting test", 60 - uptime)
119                time.sleep(60 - uptime)
120
121            if upstart.has_service('modemfwd') and upstart.is_running('modemfwd'):
122                # Due to b/179796133, stopping modemfwd right after it was
123                # started by a previous test, can wedge the modem. In many
124                # devices, a ~1 second delay solves the problem.
125                time.sleep(4)
126                upstart.stop_job('modemfwd')
127            # Temporarily disable shill autoconnect to cellular service while
128            # the test environment is setup to prevent a race condition
129            # between disconnecting the modem in _verify_cellular_service()
130            # and shill autoconnect.
131            with self._disable_shill_autoconnect():
132                try:
133                    from contextlib import nested # Python 2
134                except ImportError:
135                    from contextlib import ExitStack, contextmanager
136
137                    @contextmanager
138                    def nested(*contexts):
139                        """ Implementation of nested for python3"""
140                        with ExitStack() as stack:
141                            for ctx in contexts:
142                                stack.enter_context(ctx)
143                            yield contexts
144
145                self._nested = nested(*self._context_managers)
146
147                self._nested.__enter__()
148
149                self._initialize_shill()
150
151                # Perform SIM verification now to ensure that we can enable the
152                # modem in _initialize_modem_components(). ModemManager does not
153                # allow enabling a modem without a SIM.
154                self._verify_sim()
155                self._initialize_modem_components()
156
157                self._setup_logging()
158
159                if not self._is_esim_test:
160                    self._wait_for_modem_registration()
161                self._verify_cellular_service()
162
163                return self
164        except (error.TestError, dbus.DBusException,
165                shill_proxy.ShillProxyError) as e:
166            except_type, except_value, except_traceback = sys.exc_info()
167            lines = traceback.format_exception(except_type, except_value,
168                                               except_traceback)
169            logging.error('Error during test initialization:\n%s',
170                          ''.join(lines))
171            self.__exit__(*sys.exc_info())
172            raise error.TestError('INIT_ERROR: %s' % str(e))
173        except:
174            self.__exit__(*sys.exc_info())
175            raise
176
177    def __exit__(self, exception, value, traceback):
178        exception_on_restore_state = None
179        try:
180            self._restore_state()
181        except Exception as ex:
182            # Exceptions thrown by _restore_state() should be ignored if a
183            # previous exception exist, otherwise the root cause of the test
184            # failure will be overwritten by the clean up error in
185            # _restore_state, and that is not useful.
186            if exception is None:
187                exception_on_restore_state = ex
188
189        # If a test fails and a crash is detected, the crash error takes
190        # priority over the previous failure.
191        crash_files = self.detect_crash.get_new_crash_files()
192        if any(cf for cf in crash_files if any(pr in cf for pr in [
193                'ModemManager', 'shill', 'qmi', 'mbim', 'hermes', 'modemfwd'
194        ])):
195            logging.info(
196                    'A crash was encountered. '
197                    'Overriding the previous error: %s', value)
198            raise error.TestError(
199                    'One or more daemon crashes were detected. '
200                    'See crash dumps: {}'.format(crash_files))
201
202        if exception_on_restore_state is not None:
203            raise exception_on_restore_state
204
205        if self._nested:
206            return self._nested.__exit__(exception, value, traceback)
207        self.shill = None
208        self.modem_manager = None
209        self.modem = None
210        self.modem_path = None
211
212    def _restore_state(self):
213        """Try to restore the test environment to a good state.
214        """
215        if upstart.has_service('modemfwd'):
216            upstart.restart_job('modemfwd')
217        if self.shill:
218            self._set_service_order(self._system_service_order)
219
220    def _get_shill_cellular_device_object(self):
221        return utils.poll_for_condition(
222            lambda: self.shill.find_cellular_device_object(),
223            exception=error.TestError('Cannot find cellular device in shill. '
224                                      'Is the modem plugged in?'),
225            timeout=shill_proxy.ShillProxy.DEVICE_ENUMERATION_TIMEOUT)
226
227    def _get_service_order(self):
228        """Get the shill service order.
229
230        @return string service order on success, None otherwise.
231
232        """
233        return str(self.shill.manager.GetServiceOrder())
234
235    def _set_service_order(self, order):
236        """Set the shill service order.
237
238        @param order string comma-delimited service order
239        (eg. 'cellular,ethernet')
240        @return bool True on success, False otherwise.
241
242        """
243        self.shill.manager.SetServiceOrder(dbus.String(order))
244        return True
245
246    def _enable_modem(self):
247        modem_device = self._get_shill_cellular_device_object()
248        try:
249            modem_device.Enable()
250        except dbus.DBusException as e:
251            if (e.get_dbus_name() !=
252                    shill_proxy.ShillProxy.ERROR_IN_PROGRESS):
253                raise
254
255        utils.poll_for_condition(
256            lambda: modem_device.GetProperties()['Powered'],
257            exception=error.TestError(
258                'Failed to enable modem.'),
259            timeout=shill_proxy.ShillProxy.DEVICE_ENABLE_DISABLE_TIMEOUT)
260
261    def _enable_shill_cellular_autoconnect(self, enable):
262        shill = cellular_proxy.CellularProxy.get_proxy(self.bus)
263        shill.manager.SetProperty(
264            shill_proxy.ShillProxy.
265            MANAGER_PROPERTY_NO_AUTOCONNECT_TECHNOLOGIES,
266            '' if enable else 'cellular')
267
268    def _is_unsupported_error(self, e):
269        return (e.get_dbus_name() ==
270                shill_proxy.ShillProxy.ERROR_NOT_SUPPORTED or
271                (e.get_dbus_name() ==
272                 shill_proxy.ShillProxy.ERROR_FAILURE and
273                 'operation not supported' in e.get_dbus_message()))
274
275    def _reset_modem(self):
276        modem_device = self._get_shill_cellular_device_object()
277        try:
278            # MBIM modems do not support being reset.
279            self.shill.reset_modem(modem_device, expect_service=False)
280        except dbus.DBusException as e:
281            if not self._is_unsupported_error(e):
282                raise
283
284    def _initialize_shill(self):
285        """Get access to shill."""
286        # CellularProxy.get_proxy() checks to see if shill is running and
287        # responding to DBus requests. It returns None if that's not the case.
288        self.shill = cellular_proxy.CellularProxy.get_proxy(self.bus)
289        if self.shill is None:
290            raise error.TestError('Cannot connect to shill, is shill running?')
291
292        self._system_service_order = self._get_service_order()
293        self._set_service_order(self._test_service_order)
294
295    def _initialize_modem_components(self):
296        """Reset the modem and get access to modem components."""
297        # Enable modem first so shill initializes the modemmanager proxies so
298        # we can call reset on it.
299        self._enable_modem()
300        if not self._skip_modem_reset:
301            self._reset_modem()
302
303        # PickOneModem() makes sure there's a modem manager and that there is
304        # one and only one modem.
305        self.modem_manager, self.modem_path = \
306            mm.PickOneModem(self._modem_pattern)
307        self.modem = self.modem_manager.GetModem(self.modem_path)
308        if self.modem is None:
309            raise error.TestError('Cannot get modem object at %s.' %
310                                  self.modem_path)
311
312    def _setup_logging(self):
313        self.shill.set_logging_for_cellular_test()
314        self.modem_manager.SetDebugLogging()
315
316    def _verify_sim(self):
317        """Verify SIM is valid.
318
319        Make sure a SIM in inserted and that it is not locked.
320
321        @raise error.TestError if SIM does not exist or is locked.
322
323        """
324        # check modem SIM slot and properties and switch slot as needed
325        modem_proxy = self._check_for_modem_with_sim()
326        if modem_proxy is None:
327            raise error.TestError('There is no Modem with non empty SIM path.')
328
329        modem_device = self._get_shill_cellular_device_object()
330        props = modem_device.GetProperties()
331
332        # No SIM in CDMA modems.
333        family = props[
334            cellular_proxy.CellularProxy.DEVICE_PROPERTY_TECHNOLOGY_FAMILY]
335        if (family ==
336                cellular_proxy.CellularProxy.
337                DEVICE_PROPERTY_TECHNOLOGY_FAMILY_CDMA):
338            return
339
340        # Make sure there is a SIM.
341        if not props[cellular_proxy.CellularProxy.DEVICE_PROPERTY_SIM_PRESENT]:
342            raise error.TestError('There is no SIM in the modem.')
343
344        # Make sure SIM is not locked.
345        lock_status = props.get(
346            cellular_proxy.CellularProxy.DEVICE_PROPERTY_SIM_LOCK_STATUS,
347            None)
348        if lock_status is None:
349            raise error.TestError('Failed to read SIM lock status.')
350        locked = lock_status.get(
351            cellular_proxy.CellularProxy.PROPERTY_KEY_SIM_LOCK_ENABLED,
352            None)
353        if locked is None:
354            raise error.TestError('Failed to read SIM LockEnabled status.')
355        elif locked:
356            raise error.TestError(
357                'SIM is locked, test requires an unlocked SIM.')
358
359    def _check_for_modem_with_sim(self):
360        """
361        Make sure modem got active SIM and path is not empty
362
363        switch slot to get non empty sim path and active sim slot for modem
364
365        @return active modem object or None
366
367        """
368        mm_proxy = mm1_proxy.ModemManager1Proxy.get_proxy()
369        if mm_proxy is None:
370            raise error.TestError('Modem manager is not initialized')
371
372        modem_proxy = mm_proxy.wait_for_modem(mm1_constants.MM_MODEM_POLL_TIME)
373        if modem_proxy is None:
374            raise error.TestError('Modem not initialized')
375
376        primary_slot = modem_proxy.get_primary_sim_slot()
377        # Get SIM path from modem SIM properties
378        modem_props = modem_proxy.properties(mm1_constants.I_MODEM)
379        sim_path = modem_props['Sim']
380
381        logging.info('Device SIM values=> path:%s '
382                'primary slot:%d', sim_path, primary_slot)
383
384        def is_usable_sim(path):
385            """Check if sim at path can be used to establish a connection"""
386            if path == mm1_constants.MM_EMPTY_SLOT_PATH:
387                return False
388            sim_proxy = modem_proxy.get_sim_at_path(path)
389            sim_props = sim_proxy.properties()
390            return sim_props[
391                    'EsimStatus'] != mm1_constants.MM_SIM_ESIM_STATUS_NO_PROFILES
392
393        # Check current SIM path value and status
394        if is_usable_sim(sim_path):
395            return modem_proxy
396
397        slots = modem_props['SimSlots']
398        logging.info('Dut not in expected state, '
399                    'current sim path:%s slots:%s', sim_path, slots)
400
401        for idx, path in enumerate(slots):
402            if not is_usable_sim(path):
403                continue
404            logging.info('Primary slot does not have a SIM, '
405                        'switching slot to %d', idx+1)
406
407            if (primary_slot != idx + 1):
408                logging.info('setting slot:%d path:%s', idx+1, path)
409                modem_proxy.set_primary_slot(idx+1)
410                modem_proxy = \
411                    mm_proxy.wait_for_modem(mm1_constants.MM_MODEM_POLL_TIME)
412                return modem_proxy
413        return None
414
415    def _wait_for_modem_registration(self):
416        """Wait for the modem to register with the network.
417
418        @raise error.TestError if modem is not registered.
419
420        """
421        utils.poll_for_condition(
422            self.modem.ModemIsRegistered,
423            exception=error.TestError(
424                'Modem failed to register with the network.'),
425            timeout=cellular_proxy.CellularProxy.SERVICE_REGISTRATION_TIMEOUT)
426
427    def _verify_cellular_service(self):
428        """Make sure a cellular service exists.
429
430        The cellular service should not be connected to the network.
431
432        @raise error.TestError if cellular service does not exist or if
433                there are multiple cellular services.
434
435        """
436        service = self.shill.wait_for_cellular_service_object()
437
438        try:
439            service.Disconnect()
440        except dbus.DBusException as e:
441            if (e.get_dbus_name() !=
442                    cellular_proxy.CellularProxy.ERROR_NOT_CONNECTED):
443                raise
444        success, state, _ = self.shill.wait_for_property_in(
445            service,
446            cellular_proxy.CellularProxy.SERVICE_PROPERTY_STATE,
447            ('idle',),
448            cellular_proxy.CellularProxy.SERVICE_DISCONNECT_TIMEOUT)
449        if not success:
450            raise error.TestError(
451                'Cellular service needs to start in the "idle" state. '
452                'Current state is "%s". '
453                'Modem disconnect may have failed.' %
454                state)
455
456
457class CellularOTATestEnvironment(CellularTestEnvironment):
458    """Setup and verify cellular over-the-air (OTA) test environment. """
459
460    def __init__(self, **kwargs):
461        super(CellularOTATestEnvironment, self).__init__(**kwargs)
462
463# pseudomodem tests disabled with b/180627893, cleaningup all pseudomodem
464# related files and imports through: b/205769777
465'''
466class CellularPseudoMMTestEnvironment(CellularTestEnvironment):
467    """Setup and verify cellular pseudomodem test environment. """
468
469    def __init__(self, pseudomm_args=None, **kwargs):
470        """
471        @param pseudomm_args: Tuple of arguments passed to the pseudomodem, see
472                pseudomodem_context.py for description of each argument in the
473                tuple: (flags_map, block_output, bus)
474
475        """
476        kwargs["skip_modem_reset"] = True
477        super(CellularPseudoMMTestEnvironment, self).__init__(**kwargs)
478        self._context_managers.append(
479            pseudomodem_context.PseudoModemManagerContext(
480                True, bus=self.bus, *pseudomm_args))
481'''
482
483class CellularESIMTestEnvironment(CellularTestEnvironment):
484    """Setup cellular eSIM test environment. """
485
486    def __init__(self, esim_arguments=None, **kwargs):
487        kwargs["skip_modem_reset"] = True
488        kwargs["is_esim_test"] = True
489        super(CellularESIMTestEnvironment, self).__init__(**kwargs)
490