xref: /aosp_15_r20/external/autotest/client/cros/networking/mm1_proxy.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2021 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""
7This module provides bindings for ModemManager1.
8
9"""
10
11import dbus
12import dbus.mainloop.glib
13import logging
14import time
15
16from autotest_lib.client.bin import utils
17from autotest_lib.client.cros.cellular import mm1_constants
18
19
20def _is_unknown_dbus_binding_exception(e):
21    return (isinstance(e, dbus.exceptions.DBusException) and
22            e.get_dbus_name() in [mm1_constants.DBUS_SERVICE_UNKNOWN,
23                                  mm1_constants.DBUS_UNKNOWN_METHOD,
24                                  mm1_constants.DBUS_UNKNOWN_OBJECT,
25                                  mm1_constants.DBUS_UNKNOWN_INTERFACE])
26
27
28class ModemManager1ProxyError(Exception):
29    """Exceptions raised by ModemManager1ProxyError and it's children."""
30    pass
31
32
33class ModemManager1Proxy(object):
34    """A wrapper around a DBus proxy for ModemManager1."""
35
36    # Amount of time to wait between attempts to connect to ModemManager1.
37    CONNECT_WAIT_INTERVAL_SECONDS = 0.2
38
39    @classmethod
40    def get_proxy(cls, bus=None, timeout_seconds=10):
41        """Connect to ModemManager1 over DBus, retrying if necessary.
42
43        After connecting to ModemManager1, this method will verify that
44        ModemManager1 is answering RPCs.
45
46        @param bus: D-Bus bus to use, or specify None and this object will
47            create a mainloop and bus.
48        @param timeout_seconds: float number of seconds to try connecting
49            A value <= 0 will cause the method to return immediately,
50            without trying to connect.
51        @return a ModemManager1Proxy instance if we connected, or None
52            otherwise.
53        @raise ModemManager1ProxyError if it fails to connect to
54            ModemManager1.
55
56        """
57        def _connect_to_mm1(bus):
58            try:
59                # We create instance of class on which this classmethod was
60                # called. This way, calling
61                # SubclassOfModemManager1Proxy.get_proxy() will get a proxy of
62                # the right type.
63                return cls(bus=bus)
64            except dbus.exceptions.DBusException as e:
65                if _is_unknown_dbus_binding_exception(e):
66                    return None
67                raise ModemManager1ProxyError(
68                    'Error connecting to ModemManager1. DBus error: |%s|',
69                    repr(e))
70
71        utils.poll_for_condition(
72            lambda: _connect_to_mm1(bus) is not None,
73            exception=ModemManager1ProxyError(
74                'Timed out connecting to ModemManager1'),
75            timeout=timeout_seconds,
76            sleep_interval=ModemManager1Proxy.CONNECT_WAIT_INTERVAL_SECONDS)
77        connection = _connect_to_mm1(bus)
78
79        # Check to make sure ModemManager1 is responding to DBus requests by
80        # setting the logging to debug.
81        connection.manager.SetLogging('DEBUG', timeout=timeout_seconds)
82
83        return connection
84
85    def __init__(self, bus=None):
86        if bus is None:
87            dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
88            bus = dbus.SystemBus()
89        self._bus = bus
90        self._manager = dbus.Interface(
91            self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
92                                 mm1_constants.MM1),
93            mm1_constants.I_MODEM_MANAGER)
94        self._device = None
95
96    @property
97    def manager(self):
98        """@return the DBus ModemManager1 Manager object."""
99        return self._manager
100
101    def inhibit_device(self, inhibit):
102        """
103
104        Uses Modem Manager InhibitDevice DBus API to inhibit/uninhibit
105        @param inhibit: true to inhibit the modem and false to uninhibit it.
106
107        InhibitDevice API:
108        @uid: the unique ID of the physical device, given in the
109              #org.freedesktop.ModemManager1.Modem:Device property.
110        @inhibit: %TRUE to inhibit the modem and %FALSE to uninhibit it.
111
112        Inhibit or uninhibit the device.
113
114        When the modem is inhibited ModemManager will close all its ports and
115        unexport it from the bus, so that users of the interface are no longer
116        able to operate with it.
117
118        This operation binds the inhibition request to the existence of the
119        caller in the DBus bus. If the caller disappears from the bus, the
120        inhibition will automatically removed.
121        """
122        try:
123            if not self._manager:
124                raise ModemManager1ProxyError(
125                    'Failed to obtain dbus manager object.- No manager')
126            if not inhibit and not self._device:
127                raise ModemManager1ProxyError(
128                    'Uninhibit called before inhibit %s' % self._device)
129
130            if inhibit:
131                modem = self.get_modem()
132                if not modem:
133                    raise ModemManager1ProxyError(
134                        'Failed to to obtain dbus manager object. - No modem')
135
136                self._device = modem.properties(
137                mm1_constants.I_MODEM).get('Device')
138
139            logging.debug('device to be inhibited/uninhibited %s', self._device)
140            self._manager.InhibitDevice(dbus.String(self._device), inhibit)
141
142            logging.debug('inhibit=%r done with %s', inhibit, self._device)
143
144            if inhibit:
145                time.sleep(mm1_constants.MM_INHIBIT_PROCESSING_TIME)
146            else:
147                result = self.wait_for_modem(
148                    mm1_constants.MM_UNINHIBIT_PROCESSING_TIME)
149
150                time.sleep(mm1_constants.MM_REPROBE_PROCESSING_TIME)
151                if result is None:
152                    raise ModemManager1ProxyError('No modem after uninhibit')
153        except dbus.exceptions.DBusException as e:
154            raise ModemManager1ProxyError(
155                'Failed to to obtain dbus object for the modem.'
156                'DBus error: %s' % repr(e))
157
158    def get_modem(self):
159        """
160        Return the one and only modem object.
161
162        This method distinguishes between no modem and more than one modem.
163        In the former, this could happen if the modem has not yet surfaced and
164        is not really considered an error. The caller can wait for the modem
165        by repeatedly calling this method. In the latter, it is a clear error
166        condition and an exception will be raised.
167
168        Every call to |get_modem| obtains a fresh DBus proxy for the modem. So,
169        if the modem DBus object has changed between two calls to this method,
170        the proxy returned will be for the currently exported modem.
171
172        @return a ModemProxy object.  Return None if no modem is found.
173        @raise ModemManager1ProxyError unless exactly one modem is found.
174
175        """
176        try:
177            object_manager = dbus.Interface(
178                self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
179                                     mm1_constants.MM1),
180                mm1_constants.I_OBJECT_MANAGER)
181            modems = object_manager.GetManagedObjects()
182        except dbus.exceptions.DBusException as e:
183            raise ModemManager1ProxyError(
184                'Failed to list the available modems. DBus error: %s' %
185                repr(e))
186
187        if not modems:
188            return None
189        elif len(modems) > 1:
190            raise ModemManager1ProxyError(
191                'Expected one modem object, found %d' % len(modems))
192
193        modem_proxy = ModemProxy(self._bus, list(modems.keys())[0])
194        # Check that this object is valid
195        try:
196            modem_proxy.modem.GetAll(mm1_constants.I_MODEM,
197                                     dbus_interface=mm1_constants.I_PROPERTIES)
198            return modem_proxy
199        except dbus.exceptions.DBusException as e:
200            if _is_unknown_dbus_binding_exception(e):
201                return None
202            raise ModemManager1ProxyError(
203                'Failed to obtain dbus object for the modem. DBus error: %s' %
204                repr(e))
205
206    def wait_for_modem(self, timeout_seconds):
207        """
208        Wait for the modem to appear.
209
210        @param timeout_seconds: Number of seconds to wait for modem to appear.
211        @return a ModemProxy object.
212        @raise ModemManager1ProxyError if no modem is found within the timeout
213                or if more than one modem is found. NOTE: This method does not
214                wait for a second modem. The exception is raised if there is
215                more than one modem at the time of polling.
216
217        """
218        return utils.poll_for_condition(
219            self.get_modem,
220            exception=ModemManager1ProxyError('No modem found'),
221            timeout=timeout_seconds)
222
223
224class ModemProxy(object):
225    """A wrapper around a DBus proxy for ModemManager1 modem object."""
226
227    # Amount of time to wait for a state transition.
228    STATE_TRANSITION_WAIT_SECONDS = 60
229
230    def __init__(self, bus, path):
231        self._bus = bus
232        self._modem = self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path)
233
234    @property
235    def modem(self):
236        """@return the DBus modem object."""
237        return self._modem
238
239    @property
240    def iface_modem(self):
241        """@return org.freedesktop.ModemManager1.Modem DBus interface."""
242        return dbus.Interface(self._modem, mm1_constants.I_MODEM)
243
244    @property
245    def iface_simple_modem(self):
246        """@return org.freedesktop.ModemManager1.Simple DBus interface."""
247        return dbus.Interface(self._modem, mm1_constants.I_MODEM_SIMPLE)
248
249    @property
250    def iface_gsm_modem(self):
251        """@return org.freedesktop.ModemManager1.Modem3gpp DBus interface."""
252        return dbus.Interface(self._modem, mm1_constants.I_MODEM_3GPP)
253
254    @property
255    def iface_cdma_modem(self):
256        """@return org.freedesktop.ModemManager1.ModemCdma DBus interface."""
257        return dbus.Interface(self._modem, mm1_constants.I_MODEM_CDMA)
258
259    @property
260    def iface_properties(self):
261        """@return org.freedesktop.DBus.Properties DBus interface."""
262        return dbus.Interface(self._modem, dbus.PROPERTIES_IFACE)
263
264    def properties(self, iface):
265        """Return the properties associated with the specified interface.
266
267        @param iface: Name of interface to retrieve the properties from.
268        @return array of properties.
269
270        """
271        return self.iface_properties.GetAll(iface)
272
273    def get_sim(self):
274        """
275        Return the SIM proxy object associated with this modem.
276
277        @return SimProxy object or None if no SIM exists.
278
279        """
280        sim_path = self.properties(mm1_constants.I_MODEM).get('Sim')
281        if not sim_path:
282            return None
283        return self.get_sim_at_path(sim_path)
284
285    def get_sim_at_path(self, sim_path):
286        """
287        Return the SIM proxy object associated with the sim_path.
288
289        @return SimProxy object or None if no SIM exists.
290
291        """
292        sim_proxy = SimProxy(self._bus, sim_path)
293        # Check that this object is valid
294        try:
295            sim_proxy.properties(mm1_constants.I_SIM)
296            return sim_proxy
297        except dbus.exceptions.DBusException as e:
298            if _is_unknown_dbus_binding_exception(e):
299                return None
300            raise ModemManager1ProxyError(
301                'Failed to obtain dbus object for the SIM. DBus error: %s' %
302                repr(e))
303
304    def get_sim_slots(self):
305        """
306        The list of SIM slots available in the system, including the SIM object
307        paths if the cards are present. If a given SIM slot at a given index
308        doesn't have a SIM card available, an empty object path will be given.
309
310        The length of this array of objects will be equal to the amount of
311        available SIM slots in the system, and the index in the array is the
312        slot index.
313
314        This list includes the SIM object considered as primary active SIM slot
315        (#org.freedesktop.ModemManager1.Modem.Sim) at index
316        #org.freedesktop.ModemManager1.Modem.ActiveSimSlot.
317
318        @return list of SimSlot paths
319
320        """
321        return self.properties(mm1_constants.I_MODEM).get('SimSlots')
322
323    def get_primary_sim_slot(self):
324        """
325        The index of the primary active SIM slot in the
326        #org.freedesktop.ModemManager1.Modem.SimSlots array, given in the [1,N]
327        range.
328
329        If multiple SIM slots aren't supported, this property will report None
330
331        In a Multi SIM Single Standby setup, this index identifies the only SIM
332        that is currently active. All the remaining slots will be inactive.
333
334        In a Multi SIM Multi Standby setup, this index identifies the active SIM
335        that is considered primary, i.e. the one that will be used when a data
336        connection is setup.
337
338        @return current primary slot index
339
340        """
341        return self.properties(mm1_constants.I_MODEM).get('PrimarySimSlot')
342
343    def set_primary_slot(self, sim_slot):
344        """
345        Selects which SIM slot to be considered as primary, on devices that
346        expose multiple slots in the #org.freedesktop.ModemManager1.Modem
347        :SimSlots property.
348
349        When the switch happens the modem may require a full device reprobe,
350        so the modem object in DBus will get removed, and recreated once the
351        selected SIM slot is in use.
352
353        There is no limitation on which SIM slot to select, so the user may
354        also set as primary a slot that doesn't currently have any valid SIM
355        card inserted.
356
357        @param: sim_slot: SIM slot number to set as primary.
358        @return: success or raise error
359
360        """
361        self.iface_modem.SetPrimarySimSlot(dbus.UInt32(sim_slot))
362
363    def wait_for_states(self, states,
364                        timeout_seconds=STATE_TRANSITION_WAIT_SECONDS):
365        """
366        Wait for the modem to transition to a state in |states|.
367
368        This method does not support transitory states (eg. enabling,
369        disabling, connecting, disconnecting, etc).
370
371        @param states: List of states the modem can transition to.
372        @param timeout_seconds: Max number of seconds to wait.
373        @raise ModemManager1ProxyError if the modem does not transition to
374            one of the accepted states.
375
376        """
377        for state in states:
378            if state in [mm1_constants.MM_MODEM_STATE_INITIALIZING,
379                         mm1_constants.MM_MODEM_STATE_DISABLING,
380                         mm1_constants.MM_MODEM_STATE_ENABLING,
381                         mm1_constants.MM_MODEM_STATE_SEARCHING,
382                         mm1_constants.MM_MODEM_STATE_DISCONNECTING,
383                         mm1_constants.MM_MODEM_STATE_CONNECTING]:
384                raise ModemManager1ProxyError(
385                    'wait_for_states() does not support transitory states.')
386
387        utils.poll_for_condition(
388            lambda: self.properties(mm1_constants.I_MODEM)[
389                mm1_constants.MM_MODEM_PROPERTY_NAME_STATE] in states,
390            exception=ModemManager1ProxyError(
391                'Timed out waiting for modem to enter one of these '
392                'states: %s, current state=%s' %
393                (states,
394                 self.properties(mm1_constants.I_MODEM)[
395                     mm1_constants.MM_MODEM_PROPERTY_NAME_STATE])),
396            timeout=timeout_seconds)
397
398class SimProxy(object):
399    """A wrapper around a DBus proxy for ModemManager1 SIM object."""
400
401    def __init__(self, bus, path):
402        self._bus = bus
403        self._sim = self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path)
404
405    @property
406    def sim(self):
407        """@return the DBus SIM object."""
408        return self._sim
409
410    @property
411    def iface_properties(self):
412        """@return org.freedesktop.DBus.Properties DBus interface."""
413        return dbus.Interface(self._sim, dbus.PROPERTIES_IFACE)
414
415    @property
416    def iface_sim(self):
417        """@return org.freedesktop.ModemManager1.Sim DBus interface."""
418        return dbus.Interface(self._sim, mm1_constants.I_SIM)
419
420    def properties(self, iface=mm1_constants.I_SIM):
421        """Return the properties associated with the specified interface.
422
423        @param iface: Name of interface to retrieve the properties from.
424        @return array of properties.
425
426        """
427        return self.iface_properties.GetAll(iface)
428