# Lint as: python2, python3 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. from __future__ import absolute_import from __future__ import division from __future__ import print_function import cmd import dbus import dbus.types import dbus.exceptions import six from six.moves import input import common from autotest_lib.client.cros.cellular.pseudomodem import pm_constants from autotest_lib.client.cros.cellular import mm1_constants class PseudoModemClient(cmd.Cmd): """ Interactive client for PseudoModemManager. """ def __init__(self): cmd.Cmd.__init__(self) self.prompt = '> ' self._bus = dbus.SystemBus() def _get_proxy(self, path=pm_constants.TESTING_PATH): return self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path) def _get_ism_proxy(self, state_machine): return self._get_proxy('/'.join([pm_constants.TESTING_PATH, state_machine])) def Begin(self): """ Starts the interactive shell. """ print('\nWelcome to the PseudoModemManager shell!\n') self.cmdloop() def can_exit(self): """Override""" return True def do_is_alive(self, args): """ Handles the 'is_alive' command. @params args: ignored. """ if args: print('\nCommand "is_alive" expects no arguments.\n') return print(self._get_proxy().IsAlive(dbus_interface=pm_constants.I_TESTING)) def help_is_alive(self): """ Handles the 'help is_alive' command. """ print('\nChecks that pseudomodem child process is alive.\n') def do_properties(self, args): """ Handles the 'properties' command. @param args: Arguments to the command. Unused. """ if args: print('\nCommand "properties" expects no arguments.\n') return try: props = self._get_proxy().GetAll( pm_constants.I_TESTING, dbus_interface=mm1_constants.I_PROPERTIES) print('\nProperties: ') for k, v in six.iteritems(props): print(' ' + k + ': ' + str(v)) print() except dbus.exceptions.DBusException as e: print(('\nAn error occurred while communicating with ' 'PseudoModemManager: ' + e.get_dbus_name() + ' - ' + e.message + '\n')) return False def help_properties(self): """Handles the 'help properties' command.""" print('\nReturns the properties under the testing interface.\n') def do_sms(self, args): """ Simulates a received SMS. @param args: A string containing the sender and the text message content, in which everything before the first ' ' character belongs to the sender and everything else belongs to the message content. For example "Gandalf You shall not pass!" will be parsed into: sender="Gandalf" content="You shall not pass!" Pseudomodem doesn't distinguish between phone numbers and strings containing non-numeric characters for the sender field so args can contain pretty much anything. """ arglist = args.split(' ', 1) if len(arglist) != 2: print('\nMalformed SMS args: ' + args + '\n') return try: self._get_proxy().ReceiveSms( arglist[0], arglist[1], dbus_interface=pm_constants.I_TESTING) print('\nSMS sent!\n') except dbus.exceptions.DBusException as e: print(('\nAn error occurred while communicating with ' 'PseudoModemManager: ' + e.get_dbus_name() + ' - ' + e.message + '\n')) return False def help_sms(self): """Handles the 'help sms' command.""" print('\nUsage: sms \n') def do_set(self, args): """ Handles various commands that start with 'set'. @param args: Defines the set command to be issued and its arguments. Currently supported commands are: set pco """ arglist = args.split(' ') if len(arglist) < 1: print('\nInvalid command: set ' + args + '\n') return if arglist[0] == 'pco': if len(arglist) == 1: arglist.append('') elif len(arglist) != 2: print('\nExpected: pco . Found: ' + args + '\n') return pco_value = arglist[1] try: pco_list = [dbus.types.Struct( [dbus.types.UInt32(1), dbus.types.Boolean(True), dbus.types.ByteArray(pco_value)], signature='ubay')] self._get_proxy().UpdatePco( pco_list, dbus_interface=pm_constants.I_TESTING) print('\nPCO value updated!\n') except dbus.exceptions.DBusException as e: print(('\nAn error occurred while communicating with ' 'PseudoModemManager: ' + e.get_dbus_name() + ' - ' + e.message + '\n')) else: print('\nUnknown command: set ' + args + '\n') return False def help_set(self): """Handles the 'help set' command.""" print ('\nUsage: set pco \n can be empty to set' ' the PCO value to an empty list.\n') def _get_state_machine(self, args): arglist = args.split() if len(arglist) != 1: print('\nExpected one argument: Name of state machine\n') return None try: return self._get_ism_proxy(arglist[0]) except dbus.exceptions.DBusException as e: print('\nNo such interactive state machine.\n') print('Error obtained: |%s|\n' % repr(e)) return None def do_is_waiting(self, machine): """ Determine if a machine is waiting for an advance call. @param machine: Case sensitive name of the machine. @return: True if |machine| is waiting to be advanced by the user. """ ism = self._get_state_machine(machine) if not ism: return False try: is_waiting = ism.IsWaiting( dbus_interface=pm_constants.I_TESTING_ISM) print(('\nState machine is %swaiting.\n' % ('' if is_waiting else 'not '))) except dbus.exceptions.DBusException as e: print(('\nCould not determine if |%s| is waiting: |%s|\n' % (machine, repr(e)))) return False def help_is_waiting(self): """Handles the 'help is_waiting' command""" print ('\nUsage: is_waiting \n' 'Check whether a state machine is waiting for user action. The ' 'waiting machine can be advanced using the |advance| command.\n' 'state-machine-name is the case sensitive name of the machine' 'whose status is to be queried.\n') def do_advance(self, machine): """ Advance the given state machine. @param machine: Case sensitive name of the state machine to advance. @returns: True if |machine| was successfully advanced, False otherwise. """ ism = self._get_state_machine(machine) if not ism: return False try: success = ism.Advance(dbus_interface=pm_constants.I_TESTING_ISM) print(('\nAdvanced!\n' if success else '\nCould not advance.\n')) except dbus.exceptions.DBusException as e: print('\nError while advancing state machine: |%s|\n' % repr(e)) return False def help_advance(self): """Handles the 'help advance' command""" print ('\nUsage: advance \n' 'Advance a waiting state machine to the next step.\n' 'state-machine-name is the case sensitive name of the machine' 'to advance.\n') def do_exit(self, args): """ Handles the 'exit' command. @param args: Arguments to the command. Unused. """ if args: print('\nCommand "exit" expects no arguments.\n') return resp = input('Are you sure? (yes/no): ') if resp == 'yes': print('\nGoodbye!\n') return True if resp != 'no': print('\nDid not understand: ' + resp + '\n') return False def help_exit(self): """Handles the 'help exit' command.""" print ('\nExits the interpreter. Shuts down the pseudo modem manager ' 'if the interpreter was launched by running pseudomodem.py') do_EOF = do_exit help_EOF = help_exit def main(): """ main method, run when this module is executed as stand-alone. """ client = PseudoModemClient() client.Begin() if __name__ == '__main__': main()