xref: /aosp_15_r20/external/autotest/client/cros/cellular/pseudomodem/client.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1
2# Lint as: python2, python3
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7from __future__ import absolute_import
8from __future__ import division
9from __future__ import print_function
10
11import cmd
12import dbus
13import dbus.types
14import dbus.exceptions
15
16import six
17
18from six.moves import input
19
20import common
21
22from autotest_lib.client.cros.cellular.pseudomodem import pm_constants
23
24from autotest_lib.client.cros.cellular import mm1_constants
25
26class PseudoModemClient(cmd.Cmd):
27    """
28    Interactive client for PseudoModemManager.
29
30    """
31    def __init__(self):
32        cmd.Cmd.__init__(self)
33        self.prompt = '> '
34        self._bus = dbus.SystemBus()
35
36
37    def _get_proxy(self, path=pm_constants.TESTING_PATH):
38        return self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path)
39
40
41    def _get_ism_proxy(self, state_machine):
42        return self._get_proxy('/'.join([pm_constants.TESTING_PATH,
43                                         state_machine]))
44
45
46    def Begin(self):
47        """
48        Starts the interactive shell.
49
50        """
51        print('\nWelcome to the PseudoModemManager shell!\n')
52        self.cmdloop()
53
54
55    def can_exit(self):
56        """Override"""
57        return True
58
59
60    def do_is_alive(self, args):
61        """
62        Handles the 'is_alive' command.
63
64        @params args: ignored.
65
66        """
67        if args:
68            print('\nCommand "is_alive" expects no arguments.\n')
69            return
70        print(self._get_proxy().IsAlive(dbus_interface=pm_constants.I_TESTING))
71
72
73    def help_is_alive(self):
74        """ Handles the 'help is_alive' command. """
75        print('\nChecks that pseudomodem child process is alive.\n')
76
77
78    def do_properties(self, args):
79        """
80        Handles the 'properties' command.
81
82        @param args: Arguments to the command. Unused.
83
84        """
85        if args:
86            print('\nCommand "properties" expects no arguments.\n')
87            return
88        try:
89            props = self._get_proxy().GetAll(
90                            pm_constants.I_TESTING,
91                            dbus_interface=mm1_constants.I_PROPERTIES)
92            print('\nProperties: ')
93            for k, v in six.iteritems(props):
94                print('   ' + k + ': ' + str(v))
95            print()
96        except dbus.exceptions.DBusException as e:
97            print(('\nAn error occurred while communicating with '
98                   'PseudoModemManager: ' + e.get_dbus_name() + ' - ' +
99                   e.message + '\n'))
100        return False
101
102
103    def help_properties(self):
104        """Handles the 'help properties' command."""
105        print('\nReturns the properties under the testing interface.\n')
106
107
108    def do_sms(self, args):
109        """
110        Simulates a received SMS.
111
112        @param args: A string containing the sender and the text message
113                content, in which everything before the first ' ' character
114                belongs to the sender and everything else belongs to the
115                message content. For example "Gandalf You shall not pass!"
116                will be parsed into:
117
118                    sender="Gandalf"
119                    content="You shall not pass!"
120
121                Pseudomodem doesn't distinguish between phone numbers and
122                strings containing non-numeric characters for the sender field
123                so args can contain pretty much anything.
124
125        """
126        arglist = args.split(' ', 1)
127        if len(arglist) != 2:
128            print('\nMalformed SMS args: ' + args + '\n')
129            return
130        try:
131            self._get_proxy().ReceiveSms(
132                    arglist[0], arglist[1],
133                    dbus_interface=pm_constants.I_TESTING)
134            print('\nSMS sent!\n')
135        except dbus.exceptions.DBusException as e:
136            print(('\nAn error occurred while communicating with '
137                   'PseudoModemManager: ' + e.get_dbus_name() + ' - ' +
138                   e.message + '\n'))
139        return False
140
141
142    def help_sms(self):
143        """Handles the 'help sms' command."""
144        print('\nUsage: sms <sender phone #> <message text>\n')
145
146
147    def do_set(self, args):
148        """
149        Handles various commands that start with 'set'.
150
151        @param args: Defines the set command to be issued and its
152                arguments. Currently supported commands are:
153
154                  set pco <pco-value>
155
156        """
157        arglist = args.split(' ')
158        if len(arglist) < 1:
159            print('\nInvalid command: set ' + args + '\n')
160            return
161        if arglist[0] == 'pco':
162            if len(arglist) == 1:
163                arglist.append('')
164            elif len(arglist) != 2:
165                print('\nExpected: pco <pco-value>. Found: ' + args + '\n')
166                return
167            pco_value = arglist[1]
168            try:
169                pco_list = [dbus.types.Struct(
170                    [dbus.types.UInt32(1),
171                        dbus.types.Boolean(True),
172                        dbus.types.ByteArray(pco_value)],
173                    signature='ubay')]
174                self._get_proxy().UpdatePco(
175                        pco_list, dbus_interface=pm_constants.I_TESTING)
176                print('\nPCO value updated!\n')
177            except dbus.exceptions.DBusException as e:
178                print(('\nAn error occurred while communicating with '
179                       'PseudoModemManager: ' + e.get_dbus_name() + ' - ' +
180                       e.message + '\n'))
181        else:
182            print('\nUnknown command: set ' + args + '\n')
183        return False
184
185
186    def help_set(self):
187        """Handles the 'help set' command."""
188        print ('\nUsage: set pco <pco-value>\n<pco-value> can be empty to set'
189               ' the PCO value to an empty list.\n')
190
191
192    def _get_state_machine(self, args):
193        arglist = args.split()
194        if len(arglist) != 1:
195            print('\nExpected one argument: Name of state machine\n')
196            return None
197        try:
198            return self._get_ism_proxy(arglist[0])
199        except dbus.exceptions.DBusException as e:
200            print('\nNo such interactive state machine.\n')
201            print('Error obtained: |%s|\n' % repr(e))
202            return None
203
204
205    def do_is_waiting(self, machine):
206        """
207        Determine if a machine is waiting for an advance call.
208
209        @param machine: Case sensitive name of the machine.
210        @return: True if |machine| is waiting to be advanced by the user.
211
212        """
213        ism = self._get_state_machine(machine)
214        if not ism:
215            return False
216
217        try:
218            is_waiting = ism.IsWaiting(
219                    dbus_interface=pm_constants.I_TESTING_ISM)
220            print(('\nState machine is %swaiting.\n' %
221                   ('' if is_waiting else 'not ')))
222        except dbus.exceptions.DBusException as e:
223            print(('\nCould not determine if |%s| is waiting: |%s|\n' %
224                   (machine, repr(e))))
225        return False
226
227
228    def help_is_waiting(self):
229        """Handles the 'help is_waiting' command"""
230        print ('\nUsage: is_waiting <state-machine-name>\n'
231               'Check whether a state machine is waiting for user action. The '
232               'waiting machine can be advanced using the |advance| command.\n'
233               'state-machine-name is the case sensitive name of the machine'
234               'whose status is to be queried.\n')
235
236
237    def do_advance(self, machine):
238        """
239        Advance the given state machine.
240
241        @param machine: Case sensitive name of the state machine to advance.
242        @returns: True if |machine| was successfully advanced, False otherwise.
243
244        """
245        ism = self._get_state_machine(machine)
246        if not ism:
247            return False
248
249        try:
250            success = ism.Advance(dbus_interface=pm_constants.I_TESTING_ISM)
251            print(('\nAdvanced!\n' if success else '\nCould not advance.\n'))
252        except dbus.exceptions.DBusException as e:
253            print('\nError while advancing state machine: |%s|\n' % repr(e))
254        return False
255
256
257    def help_advance(self):
258        """Handles the 'help advance' command"""
259        print ('\nUsage: advance <state-machine-name>\n'
260               'Advance a waiting state machine to the next step.\n'
261               'state-machine-name is the case sensitive name of the machine'
262               'to advance.\n')
263
264
265    def do_exit(self, args):
266        """
267        Handles the 'exit' command.
268
269        @param args: Arguments to the command. Unused.
270
271        """
272        if args:
273            print('\nCommand "exit" expects no arguments.\n')
274            return
275        resp = input('Are you sure? (yes/no): ')
276        if resp == 'yes':
277            print('\nGoodbye!\n')
278            return True
279        if resp != 'no':
280            print('\nDid not understand: ' + resp + '\n')
281        return False
282
283
284    def help_exit(self):
285        """Handles the 'help exit' command."""
286        print ('\nExits the interpreter. Shuts down the pseudo modem manager '
287               'if the interpreter was launched by running pseudomodem.py')
288
289
290    do_EOF = do_exit
291    help_EOF = help_exit
292
293
294def main():
295    """ main method, run when this module is executed as stand-alone. """
296    client = PseudoModemClient()
297    client.Begin()
298
299
300if __name__ == '__main__':
301    main()
302