xref: /aosp_15_r20/external/autotest/client/cros/cellular/pseudomodem/connect_machine.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import logging
11import subprocess
12
13import six
14
15from autotest_lib.client.cros.cellular.pseudomodem import pm_errors
16from autotest_lib.client.cros.cellular.pseudomodem import state_machine
17from autotest_lib.client.cros.cellular import mm1_constants
18
19class ConnectMachine(state_machine.StateMachine):
20    """
21    ConnectMachine handles the state transitions involved in bringing the modem
22    to the CONNECTED state.
23
24    """
25    def __init__(self, modem, properties, return_cb, raise_cb):
26        super(ConnectMachine, self).__init__(modem)
27        self.connect_props = properties
28        self.return_cb = return_cb
29        self.raise_cb = raise_cb
30        self.enable_initiated = False
31        self.register_initiated = False
32
33
34    def Cancel(self):
35        """ Overriden from superclass. """
36        logging.info('ConnectMachine: Canceling connect.')
37        super(ConnectMachine, self).Cancel()
38        state = self._modem.Get(mm1_constants.I_MODEM, 'State')
39        reason = mm1_constants.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED
40        if state == mm1_constants.MM_MODEM_STATE_CONNECTING:
41            logging.info('ConnectMachine: Setting state to REGISTERED.')
42            self._modem.ChangeState(mm1_constants.MM_MODEM_STATE_REGISTERED,
43                                    reason)
44        elif self.enable_initiated and self._modem.enable_step:
45            self._modem.enable_step.Cancel()
46        self._modem.connect_step = None
47
48
49    def _HandleDisabledState(self):
50        logging.info('ConnectMachine: Modem is DISABLED.')
51        assert not self._modem.IsPendingEnable()
52        if self.enable_initiated:
53            message = 'ConnectMachine: Failed to enable modem.'
54            logging.error(message)
55            self.Cancel()
56            self._modem.connect_step = None
57            self.raise_cb(pm_errors.MMCoreError(
58                    pm_errors.MMCoreError.FAILED, message))
59            return False
60        else:
61            logging.info('ConnectMachine: Initiating Enable.')
62            self.enable_initiated = True
63            self._modem.Enable(True)
64
65            # state machine will spin until modem gets enabled,
66            # or if enable fails
67            return True
68
69
70    def _HandleEnablingState(self):
71        logging.info('ConnectMachine: Modem is ENABLING.')
72        assert self._modem.IsPendingEnable()
73        logging.info('ConnectMachine: Waiting for enable.')
74        return True
75
76
77    def _HandleEnabledState(self):
78        logging.info('ConnectMachine: Modem is ENABLED.')
79
80        # Check to see if a register is going on, if not,
81        # start register
82        if self.register_initiated:
83            message = 'ConnectMachine: Failed to register.'
84            logging.error(message)
85            self.Cancel()
86            self._modem.connect_step = None
87            self.raise_cb(pm_errors.MMCoreError(pm_errors.MMCoreError.FAILED,
88                                                message))
89            return False
90        else:
91            logging.info('ConnectMachine: Waiting for Register.')
92            if not self._modem.IsPendingRegister():
93                self._modem.RegisterWithNetwork(
94                        "", self._return_cb, self._raise_cb)
95            self.register_initiated = True
96            return True
97
98
99    def _HandleSearchingState(self):
100        logging.info('ConnectMachine: Modem is SEARCHING.')
101        logging.info('ConnectMachine: Waiting for modem to register.')
102        assert self.register_initiated
103        assert self._modem.IsPendingRegister()
104        return True
105
106
107    def _HandleRegisteredState(self):
108        logging.info('ConnectMachine: Modem is REGISTERED.')
109        assert not self._modem.IsPendingDisconnect()
110        assert not self._modem.IsPendingEnable()
111        assert not self._modem.IsPendingDisable()
112        assert not self._modem.IsPendingRegister()
113        logging.info('ConnectMachine: Setting state to CONNECTING.')
114        reason = mm1_constants.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED
115        self._modem.ChangeState(mm1_constants.MM_MODEM_STATE_CONNECTING,
116                                reason)
117        return True
118
119
120    def _GetBearerToActivate(self):
121        # Import modem here to avoid circular imports.
122        import modem
123        bearer = None
124        bearer_path = None
125        bearer_props = {}
126        for p, b in six.iteritems(self._modem.bearers):
127            # assemble bearer props
128            for key, val in six.iteritems(self.connect_props):
129                if key in modem.ALLOWED_BEARER_PROPERTIES:
130                    bearer_props[key] = val
131            if (b.bearer_properties == bearer_props):
132                logging.info('ConnectMachine: Found matching bearer.')
133                bearer = b
134                bearer_path = p
135                break
136        if bearer is None:
137            assert bearer_path is None
138            logging.info(('ConnectMachine: No matching bearer found, '
139                'creating brearer with properties: ' +
140                str(self.connect_props)))
141            bearer_path = self._modem.CreateBearer(self.connect_props)
142
143        return bearer_path
144
145
146    def _HandleConnectingState(self):
147        logging.info('ConnectMachine: Modem is CONNECTING.')
148        assert not self._modem.IsPendingDisconnect()
149        assert not self._modem.IsPendingEnable()
150        assert not self._modem.IsPendingDisable()
151        assert not self._modem.IsPendingRegister()
152        try:
153            bearer_path = self._GetBearerToActivate()
154            self._modem.ActivateBearer(bearer_path)
155            logging.info('ConnectMachine: Setting state to CONNECTED.')
156            reason = mm1_constants.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED
157            self._modem.ChangeState(mm1_constants.MM_MODEM_STATE_CONNECTED,
158                                    reason)
159            self._modem.connect_step = None
160            logging.info(
161                'ConnectMachine: Returning bearer path: %s', bearer_path)
162            self.return_cb(bearer_path)
163        except (pm_errors.MMError, subprocess.CalledProcessError) as e:
164            logging.error('ConnectMachine: Failed to connect: ' + str(e))
165            self.raise_cb(e)
166            self._modem.ChangeState(
167                    mm1_constants.MM_MODEM_STATE_REGISTERED,
168                    mm1_constants.MM_MODEM_STATE_CHANGE_REASON_UNKNOWN)
169            self._modem.connect_step = None
170        return False
171
172
173    def _GetModemStateFunctionMap(self):
174        return {
175            mm1_constants.MM_MODEM_STATE_DISABLED:
176                    ConnectMachine._HandleDisabledState,
177            mm1_constants.MM_MODEM_STATE_ENABLING:
178                    ConnectMachine._HandleEnablingState,
179            mm1_constants.MM_MODEM_STATE_ENABLED:
180                    ConnectMachine._HandleEnabledState,
181            mm1_constants.MM_MODEM_STATE_SEARCHING:
182                    ConnectMachine._HandleSearchingState,
183            mm1_constants.MM_MODEM_STATE_REGISTERED:
184                    ConnectMachine._HandleRegisteredState,
185            mm1_constants.MM_MODEM_STATE_CONNECTING:
186                    ConnectMachine._HandleConnectingState
187        }
188
189
190    def _ShouldStartStateMachine(self):
191        if self._modem.connect_step and self._modem.connect_step != self:
192            # There is already a connect operation in progress.
193            message = 'There is already an ongoing connect operation.'
194            logging.error(message)
195            self.raise_cb(pm_errors.MMCoreError(
196                    pm_errors.MMCoreError.IN_PROGRESS, message))
197            return False
198        elif self._modem.connect_step is None:
199            # There is no connect operation going on, cancelled or otherwise.
200            if self._modem.IsPendingDisable():
201                message = 'Modem is currently being disabled. Ignoring ' \
202                          'connect.'
203                logging.error(message)
204                self.raise_cb(
205                    pm_errors.MMCoreError(pm_errors.MMCoreError.WRONG_STATE,
206                                          message))
207                return False
208            state = self._modem.Get(mm1_constants.I_MODEM, 'State')
209            if state == mm1_constants.MM_MODEM_STATE_CONNECTED:
210                message = 'Modem is already connected.'
211                logging.error(message)
212                self.raise_cb(
213                    pm_errors.MMCoreError(pm_errors.MMCoreError.CONNECTED,
214                                          message))
215                return False
216            if state == mm1_constants.MM_MODEM_STATE_DISCONNECTING:
217                assert self._modem.IsPendingDisconnect()
218                message = 'Cannot connect while disconnecting.'
219                logging.error(message)
220                self.raise_cb(
221                    pm_errors.MMCoreError(pm_errors.MMCoreError.WRONG_STATE,
222                                          message))
223                return False
224
225            logging.info('Starting Connect.')
226            self._modem.connect_step = self
227        return True
228