xref: /aosp_15_r20/external/autotest/client/cros/cellular/pseudomodem/pseudomodem.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6# This module is the entry point for pseudomodem. Though honestly, I can't think
7# of any case when you want to use this module directly. Instead, use the
8# |pseudomodem_context| module that provides a way to launch pseudomodem in a
9# child process.
10
11from __future__ import absolute_import
12from __future__ import division
13from __future__ import print_function
14
15import argparse
16import dbus
17import dbus.mainloop.glib
18# AU tests use ToT client code, but ToT -3 client version.
19try:
20    from gi.repository import GObject
21except ImportError:
22    import gobject as GObject
23import imp
24import json
25import logging
26import os
27import os.path
28import signal
29import sys
30import traceback
31from six.moves import range
32
33import common
34
35from autotest_lib.client.cros.cellular import mm1_constants
36from autotest_lib.client.cros.cellular.pseudomodem import testing
37from autotest_lib.client.cros.cellular.pseudomodem import logging_setup
38from autotest_lib.client.cros.cellular.pseudomodem import modem_cdma
39from autotest_lib.client.cros.cellular.pseudomodem import modem_3gpp
40from autotest_lib.client.cros.cellular.pseudomodem import modemmanager
41from autotest_lib.client.cros.cellular.pseudomodem import sim
42from autotest_lib.client.cros.cellular.pseudomodem import state_machine_factory as smf
43
44# Flags used by pseudomodem modules only that are defined below in
45# ParserArguments.
46CLI_FLAG = '--cli'
47EXIT_ERROR_FILE_FLAG = '--exit-error-file'
48
49class PseudoModemManager(object):
50    """
51    The main class to be used to launch the pseudomodem.
52
53    There should be only one instance of this class that orchestrates
54    pseudomodem.
55
56    """
57
58    def Setup(self, opts):
59        """
60        Call |Setup| to prepare pseudomodem to be launched.
61
62        @param opts: The options accepted by pseudomodem. See top level function
63                |ParseArguments| for details.
64
65        """
66        self._opts = opts
67
68        self._in_exit_sequence = False
69        self._manager = None
70        self._modem = None
71        self._state_machine_factory = None
72        self._sim = None
73        self._mainloop = None
74
75        self._dbus_loop = dbus.mainloop.glib.DBusGMainLoop()
76        self._bus = dbus.SystemBus(private=True, mainloop=self._dbus_loop)
77        self._bus_name = dbus.service.BusName(mm1_constants.I_MODEM_MANAGER,
78                                              self._bus)
79        logging.info('Exported dbus service with well known name: |%s|',
80                     self._bus_name.get_name())
81
82        self._SetupPseudomodemParts()
83        logging.info('Pseudomodem setup completed!')
84
85
86    def StartBlocking(self):
87        """
88        Start pseudomodem operation.
89
90        This call blocks untill |GracefulExit| is called from some other
91        context.
92
93        """
94        self._mainloop = GObject.MainLoop()
95        self._mainloop.run()
96
97
98    def GracefulExit(self):
99        """ Stop pseudomodem operation and clean up. """
100        if self._in_exit_sequence:
101            logging.debug('Already exiting.')
102            return
103
104        self._in_exit_sequence = True
105        logging.info('pseudomodem shutdown sequence initiated...')
106        # Guard each step by its own try...catch, because we want to attempt
107        # each step irrespective of whether the earlier ones succeeded.
108        try:
109            if self._manager:
110                self._manager.Remove(self._modem)
111        except Exception as e:
112            logging.warning('Error while exiting: %s', repr(e))
113        try:
114            if self._mainloop:
115                self._mainloop.quit()
116        except Exception as e:
117            logging.warning('Error while exiting: %s', repr(e))
118
119        logging.info('pseudomodem: Bye! Bye!')
120
121
122    def _SetupPseudomodemParts(self):
123        """
124        Contructs all pseudomodem objects, but does not start operation.
125
126        Three main objects are created: the |Modem|, the |Sim|, and the
127        |StateMachineFactory|. This objects may be instantiations of the default
128        classes, or of user provided classes, depending on options provided.
129
130        """
131        self._ReadCustomParts()
132
133        use_3gpp = (self._opts.family == '3GPP')
134
135        if not self._modem and not self._state_machine_factory:
136            self._state_machine_factory = smf.StateMachineFactory()
137            logging.info('Created default state machine factory.')
138
139        if use_3gpp and not self._sim:
140            self._sim = sim.SIM(sim.SIM.Carrier('test'),
141                                mm1_constants.MM_MODEM_ACCESS_TECHNOLOGY_GSM,
142                                locked=self._opts.locked)
143            logging.info('Created default 3GPP SIM.')
144
145        # Store this constant here because the variable name is too long.
146        network_available = dbus.types.UInt32(
147                mm1_constants.MM_MODEM_3GPP_NETWORK_AVAILABILITY_AVAILABLE)
148        if not self._modem:
149            if use_3gpp:
150                technology_gsm = dbus.types.UInt32(
151                        mm1_constants.MM_MODEM_ACCESS_TECHNOLOGY_GSM)
152                networks = [modem_3gpp.Modem3gpp.GsmNetwork(
153                        'Roaming Network Long ' + str(i),
154                        'Roaming Network Short ' + str(i),
155                        '00100' + str(i + 1),
156                        network_available,
157                        technology_gsm)
158                        for i in range(self._opts.roaming_networks)]
159                # TODO(armansito): Support "not activated" initialization option
160                # for 3GPP carriers.
161                self._modem = modem_3gpp.Modem3gpp(
162                        self._state_machine_factory,
163                        roaming_networks=networks)
164                logging.info('Created default 3GPP modem.')
165            else:
166                self._modem = modem_cdma.ModemCdma(
167                        self._state_machine_factory,
168                        modem_cdma.ModemCdma.CdmaNetwork(
169                                activated=self._opts.activated))
170                logging.info('Created default CDMA modem.')
171
172        # Everyone gets the |_bus|, woohoo!
173        self._manager = modemmanager.ModemManager(self._bus)
174        self._modem.SetBus(self._bus)  # Also sets it on StateMachineFactory.
175        self._manager.Add(self._modem)
176
177        # Unfortunately, setting the SIM has to be deferred until everyone has
178        # their BUS set. |self._sim| exists if the user provided one, or if the
179        # modem family is |3GPP|.
180        if self._sim:
181            self._modem.SetSIM(self._sim)
182
183        # The testing interface can be brought up now that we have the bus.
184        self._testing_object = testing.Testing(self._modem, self._bus)
185
186
187    def _ReadCustomParts(self):
188        """
189        Loads user provided implementations of pseudomodem objects.
190
191        The user can provide their own implementations of the |Modem|, |Sim| or
192        |StateMachineFactory| classes.
193
194        """
195        if not self._opts.test_module:
196            return
197
198        test_module = self._LoadCustomPartsModule(self._opts.test_module)
199
200        if self._opts.test_modem_class:
201            self._modem = self._CreateCustomObject(test_module,
202                                                   self._opts.test_modem_class,
203                                                   self._opts.test_modem_arg)
204
205        if self._opts.test_sim_class:
206            self._sim = self._CreateCustomObject(test_module,
207                                                 self._opts.test_sim_class,
208                                                 self._opts.test_sim_arg)
209
210        if self._opts.test_state_machine_factory_class:
211            if self._opts.test_modem_class:
212                logging.warning(
213                        'User provided a |Modem| implementation as well as a '
214                        '|StateMachineFactory|. Ignoring the latter.')
215            else:
216                self._state_machine_factory = self._CreateCustomObject(
217                        test_module,
218                        self._opts.test_state_machine_factory_class,
219                        self._opts.test_state_machine_factory_arg)
220
221
222    def _CreateCustomObject(self, test_module, class_name, arg_file_name):
223        """
224        Create the custom object specified by test.
225
226        @param test_module: The loaded module that implemets the custom object.
227        @param class_name: Name of the class implementing the custom object.
228        @param arg_file_name: Absolute path to file containing list of arguments
229                taken by |test_module|.|class_name| constructor in json.
230        @returns: A brand new object of the custom type.
231        @raises: AttributeError if the class definition is not found;
232                ValueError if |arg_file| does not contain valid json
233                representaiton of a python list.
234                Other errors may be raised during object creation.
235
236        """
237        arg = None
238        if arg_file_name:
239            arg_file = open(arg_file_name, 'rb')
240            try:
241                arg = json.load(arg_file)
242            finally:
243                arg_file.close()
244            if not isinstance(arg, list):
245                raise ValueError('Argument must be a python list.')
246
247        class_def = getattr(test_module, class_name)
248        try:
249            if arg:
250                logging.debug('Loading test class %s%s',
251                              class_name, str(arg))
252                return class_def(*arg)
253            else:
254                logging.debug('Loading test class %s', class_def)
255                return class_def()
256        except Exception as e:
257            logging.error('Exception raised when instantiating class %s: %s',
258                          class_name, str(e))
259            raise
260
261
262    def _LoadCustomPartsModule(self, module_abs_path):
263        """
264        Loads the given file as a python module.
265
266        The loaded module *is* added to |sys.modules|.
267
268        @param module_abs_path: Absolute path to the file to be loaded.
269        @returns: The loaded module.
270        @raises: ImportError if the module can not be loaded, or if another
271                 module with the same name is already loaded.
272
273        """
274        path, name = os.path.split(module_abs_path)
275        name, _ = os.path.splitext(name)
276
277        if name in sys.modules:
278            raise ImportError('A module named |%s| is already loaded.' %
279                              name)
280
281        logging.debug('Loading module %s from %s', name, path)
282        module_file, filepath, data = imp.find_module(name, [path])
283        try:
284            module = imp.load_module(name, module_file, filepath, data)
285        except Exception as e:
286            logging.error(
287                    'Exception raised when loading test module from %s: %s',
288                    module_abs_path, str(e))
289            raise
290        finally:
291            module_file.close()
292        return module
293
294
295# ##############################################################################
296# Public static functions.
297def ParseArguments(arg_string=None):
298    """
299    The main argument parser.
300
301    Pseudomodem is a command line tool.
302    Since pseudomodem is a highly customizable tool, the command line arguments
303    are expected to be quite complex.
304    We use argparse to keep the command line options easy to use.
305
306    @param arg_string: If not None, the string to parse. If none, |sys.argv| is
307            used to obtain the argument string.
308    @returns: The parsed options object.
309
310    """
311    parser = argparse.ArgumentParser(
312            description="Run pseudomodem to simulate a modem using the "
313                        "modemmanager-next DBus interface.")
314
315    parser.add_argument(
316            CLI_FLAG,
317            action='store_true',
318            default=False,
319            help='Launch the command line interface in foreground to interact '
320                 'with the launched pseudomodem process. This argument is used '
321                 'by |pseudomodem_context|. pseudomodem itself ignores it.')
322    parser.add_argument(
323            EXIT_ERROR_FILE_FLAG,
324            default=None,
325            help='If provided, full path to file to which pseudomodem should '
326                 'dump the error condition before exiting, in case of a crash. '
327                 'The file is not created if it does not already exist.')
328
329    modem_arguments = parser.add_argument_group(
330            title='Modem options',
331            description='Options to customize the modem exported.')
332    modem_arguments.add_argument(
333            '--family', '-f',
334            choices=['3GPP', 'CDMA'],
335            default='3GPP')
336
337    gsm_arguments = parser.add_argument_group(
338            title='3GPP options',
339            description='Options specific to 3GPP modems. [Only make sense '
340                        'when modem family is 3GPP]')
341
342    gsm_arguments.add_argument(
343            '--roaming-networks', '-r',
344            type=_NonNegInt,
345            default=0,
346            metavar='<# networks>',
347            help='Number of roaming networks available')
348
349    cdma_arguments = parser.add_argument_group(
350            title='CDMA options',
351            description='Options specific to CDMA modems. [Only make sense '
352                        'when modem family is CDMA]')
353
354    sim_arguments = parser.add_argument_group(
355            title='SIM options',
356            description='Options to customize the SIM in the modem. [Only make '
357                        'sense when modem family is 3GPP]')
358    sim_arguments.add_argument(
359            '--activated',
360            type=bool,
361            default=True,
362            help='Determine whether the SIM is activated')
363    sim_arguments.add_argument(
364            '--locked', '-l',
365            type=bool,
366            default=False,
367            help='Determine whether the SIM is in locked state')
368
369    testing_arguments = parser.add_argument_group(
370            title='Testing interface options',
371            description='Options to modify how the tests or user interacts '
372                        'with pseudomodem')
373    testing_arguments = parser.add_argument(
374            '--interactive-state-machines-all',
375            type=bool,
376            default=False,
377            help='Launch all state machines in interactive mode.')
378    testing_arguments = parser.add_argument(
379            '--interactive-state-machine',
380            type=str,
381            default=None,
382            help='Launch the specified state machine in interactive mode. May '
383                 'be repeated to specify multiple machines.')
384
385    customize_arguments = parser.add_argument_group(
386            title='Customizable modem options',
387            description='Options to customize the emulated modem.')
388    customize_arguments.add_argument(
389            '--test-module',
390            type=str,
391            default=None,
392            metavar='CUSTOM_MODULE',
393            help='Absolute path to the module with custom definitions.')
394    customize_arguments.add_argument(
395            '--test-modem-class',
396            type=str,
397            default=None,
398            metavar='MODEM_CLASS',
399            help='Name of the class in CUSTOM_MODULE that implements the modem '
400                 'to load.')
401    customize_arguments.add_argument(
402            '--test-modem-arg',
403            type=str,
404            default=None,
405            help='Absolute path to the json description of argument list '
406                 'taken by MODEM_CLASS.')
407    customize_arguments.add_argument(
408            '--test-sim-class',
409            type=str,
410            default=None,
411            metavar='SIM_CLASS',
412            help='Name of the class in CUSTOM_MODULE that implements the SIM '
413                 'to load.')
414    customize_arguments.add_argument(
415            '--test-sim-arg',
416            type=str,
417            default=None,
418            help='Aboslute path to the json description of argument list '
419                 'taken by SIM_CLASS')
420    customize_arguments.add_argument(
421            '--test-state-machine-factory-class',
422            type=str,
423            default=None,
424            metavar='SMF_CLASS',
425            help='Name of the class in CUSTOM_MODULE that impelements the '
426                 'state machine factory to load. Only used if MODEM_CLASS is '
427                 'not provided.')
428    customize_arguments.add_argument(
429            '--test-state-machine-factory-arg',
430            type=str,
431            default=None,
432            help='Absolute path to the json description of argument list '
433                 'taken by SMF_CLASS')
434
435    opts = parser.parse_args(arg_string)
436
437    # Extra confidence checks.
438    if opts.family == 'CDMA' and opts.roaming_networks > 0:
439        raise argparse.ArgumentTypeError('CDMA networks do not support '
440                                         'roaming networks.')
441
442    test_objects = (opts.test_modem_class or
443                    opts.test_sim_class or
444                    opts.test_state_machine_factory_class)
445    if not opts.test_module and test_objects:
446        raise argparse.ArgumentTypeError('test_module is required with any '
447                                         'other customization arguments.')
448
449    if opts.test_modem_class and opts.test_state_machine_factory_class:
450        logging.warning('test-state-machine-factory-class will be ignored '
451                        'because test-modem-class was provided.')
452
453    return opts
454
455
456def ExtractExitError(dump_file_path):
457    """
458    Gets the exit error left behind by a crashed pseudomodem.
459
460    If there is a file at |dump_file_path|, extracts the error and the traceback
461    left behind by the child process. This function is intended to be used by
462    the launching process to parse the error file left behind by pseudomodem.
463
464    @param dump_file_path: Full path to the file to read.
465    @returns: (error_reason, error_traceback)
466            error_reason: str. The one line reason for error that should be
467                    used to raise exceptions.
468            error_traceback: A list of str. This is the traceback left
469                    behind by the child process, if any. May be [].
470
471    """
472    error_reason = 'No detailed reason found :('
473    error_traceback = []
474    if dump_file_path:
475        try:
476            dump_file = open(dump_file_path, 'rb')
477            error_reason = dump_file.readline().strip()
478            error_traceback = dump_file.readlines()
479            dump_file.close()
480        except OSError as e:
481            logging.error('Could not open dump file %s: %s',
482                          dump_file_path, str(e))
483    return error_reason, error_traceback
484
485
486# The single global instance of PseudoModemManager.
487_pseudo_modem_manager = None
488
489
490# ##############################################################################
491# Private static functions.
492def _NonNegInt(value):
493    value = int(value)
494    if value < 0:
495        raise argparse.ArgumentTypeError('%s is not a non-negative int' % value)
496    return value
497
498
499def _DumpExitError(dump_file_path, exc):
500    """
501    Dump information about the raised exception in the exit error file.
502
503    Format of file dumped:
504    - First line is the reason for the crash.
505    - Subsequent lines are the traceback from the exception raised.
506
507    We expect the file to exist, because we want the launching context (that
508    will eventually read the error dump) to create and own the file.
509
510    @param dump_file_path: Full path to file to which we should dump.
511    @param exc: The exception raised.
512
513    """
514    if not dump_file_path:
515        return
516
517    if not os.path.isfile(dump_file_path):
518        logging.error('File |%s| does not exist. Can not dump exit error.',
519                      dump_file_path)
520        return
521
522    try:
523        dump_file = open(dump_file_path, 'wb')
524    except IOError as e:
525        logging.error('Could not open file |%s| to dump exit error. '
526                      'Exception raised when opening file: %s',
527                      dump_file_path, str(e))
528        return
529
530    dump_file.write((str(exc) + '\n').encode('utf-8'))
531    dump_file.writelines(traceback.format_exc())
532    dump_file.close()
533
534
535def sig_handler(signum, frame):
536    """
537    Top level signal handler to handle user interrupt.
538
539    @param signum: The signal received.
540    @param frame: Ignored.
541    """
542    global _pseudo_modem_manager
543    logging.debug('Signal handler called with signal %d', signum)
544    if _pseudo_modem_manager:
545        _pseudo_modem_manager.GracefulExit()
546
547
548def main():
549    """
550    This is the entry point for raw pseudomodem.
551
552    You should not be running this module as a script. If you're trying to run
553    pseudomodem from the command line, see |pseudomodem_context| module.
554
555    """
556    global _pseudo_modem_manager
557
558    logging_setup.SetupLogging()
559
560    logging.info('Pseudomodem commandline: [%s]', str(sys.argv))
561    opts = ParseArguments()
562
563    signal.signal(signal.SIGINT, sig_handler)
564    signal.signal(signal.SIGTERM, sig_handler)
565
566    try:
567        _pseudo_modem_manager = PseudoModemManager()
568        _pseudo_modem_manager.Setup(opts)
569        _pseudo_modem_manager.StartBlocking()
570    except Exception as e:
571        logging.error('Caught exception at top level: %s', str(e))
572        _DumpExitError(opts.exit_error_file, e)
573        _pseudo_modem_manager.GracefulExit()
574        raise
575
576
577if __name__ == '__main__':
578    main()
579