1# Lint as: python2, python3 2# Copyright (c) 2021 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6""" 7This module provides bindings for ModemManager1. 8 9""" 10 11import dbus 12import dbus.mainloop.glib 13import logging 14import time 15 16from autotest_lib.client.bin import utils 17from autotest_lib.client.cros.cellular import mm1_constants 18 19 20def _is_unknown_dbus_binding_exception(e): 21 return (isinstance(e, dbus.exceptions.DBusException) and 22 e.get_dbus_name() in [mm1_constants.DBUS_SERVICE_UNKNOWN, 23 mm1_constants.DBUS_UNKNOWN_METHOD, 24 mm1_constants.DBUS_UNKNOWN_OBJECT, 25 mm1_constants.DBUS_UNKNOWN_INTERFACE]) 26 27 28class ModemManager1ProxyError(Exception): 29 """Exceptions raised by ModemManager1ProxyError and it's children.""" 30 pass 31 32 33class ModemManager1Proxy(object): 34 """A wrapper around a DBus proxy for ModemManager1.""" 35 36 # Amount of time to wait between attempts to connect to ModemManager1. 37 CONNECT_WAIT_INTERVAL_SECONDS = 0.2 38 39 @classmethod 40 def get_proxy(cls, bus=None, timeout_seconds=10): 41 """Connect to ModemManager1 over DBus, retrying if necessary. 42 43 After connecting to ModemManager1, this method will verify that 44 ModemManager1 is answering RPCs. 45 46 @param bus: D-Bus bus to use, or specify None and this object will 47 create a mainloop and bus. 48 @param timeout_seconds: float number of seconds to try connecting 49 A value <= 0 will cause the method to return immediately, 50 without trying to connect. 51 @return a ModemManager1Proxy instance if we connected, or None 52 otherwise. 53 @raise ModemManager1ProxyError if it fails to connect to 54 ModemManager1. 55 56 """ 57 def _connect_to_mm1(bus): 58 try: 59 # We create instance of class on which this classmethod was 60 # called. This way, calling 61 # SubclassOfModemManager1Proxy.get_proxy() will get a proxy of 62 # the right type. 63 return cls(bus=bus) 64 except dbus.exceptions.DBusException as e: 65 if _is_unknown_dbus_binding_exception(e): 66 return None 67 raise ModemManager1ProxyError( 68 'Error connecting to ModemManager1. DBus error: |%s|', 69 repr(e)) 70 71 utils.poll_for_condition( 72 lambda: _connect_to_mm1(bus) is not None, 73 exception=ModemManager1ProxyError( 74 'Timed out connecting to ModemManager1'), 75 timeout=timeout_seconds, 76 sleep_interval=ModemManager1Proxy.CONNECT_WAIT_INTERVAL_SECONDS) 77 connection = _connect_to_mm1(bus) 78 79 # Check to make sure ModemManager1 is responding to DBus requests by 80 # setting the logging to debug. 81 connection.manager.SetLogging('DEBUG', timeout=timeout_seconds) 82 83 return connection 84 85 def __init__(self, bus=None): 86 if bus is None: 87 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) 88 bus = dbus.SystemBus() 89 self._bus = bus 90 self._manager = dbus.Interface( 91 self._bus.get_object(mm1_constants.I_MODEM_MANAGER, 92 mm1_constants.MM1), 93 mm1_constants.I_MODEM_MANAGER) 94 self._device = None 95 96 @property 97 def manager(self): 98 """@return the DBus ModemManager1 Manager object.""" 99 return self._manager 100 101 def inhibit_device(self, inhibit): 102 """ 103 104 Uses Modem Manager InhibitDevice DBus API to inhibit/uninhibit 105 @param inhibit: true to inhibit the modem and false to uninhibit it. 106 107 InhibitDevice API: 108 @uid: the unique ID of the physical device, given in the 109 #org.freedesktop.ModemManager1.Modem:Device property. 110 @inhibit: %TRUE to inhibit the modem and %FALSE to uninhibit it. 111 112 Inhibit or uninhibit the device. 113 114 When the modem is inhibited ModemManager will close all its ports and 115 unexport it from the bus, so that users of the interface are no longer 116 able to operate with it. 117 118 This operation binds the inhibition request to the existence of the 119 caller in the DBus bus. If the caller disappears from the bus, the 120 inhibition will automatically removed. 121 """ 122 try: 123 if not self._manager: 124 raise ModemManager1ProxyError( 125 'Failed to obtain dbus manager object.- No manager') 126 if not inhibit and not self._device: 127 raise ModemManager1ProxyError( 128 'Uninhibit called before inhibit %s' % self._device) 129 130 if inhibit: 131 modem = self.get_modem() 132 if not modem: 133 raise ModemManager1ProxyError( 134 'Failed to to obtain dbus manager object. - No modem') 135 136 self._device = modem.properties( 137 mm1_constants.I_MODEM).get('Device') 138 139 logging.debug('device to be inhibited/uninhibited %s', self._device) 140 self._manager.InhibitDevice(dbus.String(self._device), inhibit) 141 142 logging.debug('inhibit=%r done with %s', inhibit, self._device) 143 144 if inhibit: 145 time.sleep(mm1_constants.MM_INHIBIT_PROCESSING_TIME) 146 else: 147 result = self.wait_for_modem( 148 mm1_constants.MM_UNINHIBIT_PROCESSING_TIME) 149 150 time.sleep(mm1_constants.MM_REPROBE_PROCESSING_TIME) 151 if result is None: 152 raise ModemManager1ProxyError('No modem after uninhibit') 153 except dbus.exceptions.DBusException as e: 154 raise ModemManager1ProxyError( 155 'Failed to to obtain dbus object for the modem.' 156 'DBus error: %s' % repr(e)) 157 158 def get_modem(self): 159 """ 160 Return the one and only modem object. 161 162 This method distinguishes between no modem and more than one modem. 163 In the former, this could happen if the modem has not yet surfaced and 164 is not really considered an error. The caller can wait for the modem 165 by repeatedly calling this method. In the latter, it is a clear error 166 condition and an exception will be raised. 167 168 Every call to |get_modem| obtains a fresh DBus proxy for the modem. So, 169 if the modem DBus object has changed between two calls to this method, 170 the proxy returned will be for the currently exported modem. 171 172 @return a ModemProxy object. Return None if no modem is found. 173 @raise ModemManager1ProxyError unless exactly one modem is found. 174 175 """ 176 try: 177 object_manager = dbus.Interface( 178 self._bus.get_object(mm1_constants.I_MODEM_MANAGER, 179 mm1_constants.MM1), 180 mm1_constants.I_OBJECT_MANAGER) 181 modems = object_manager.GetManagedObjects() 182 except dbus.exceptions.DBusException as e: 183 raise ModemManager1ProxyError( 184 'Failed to list the available modems. DBus error: %s' % 185 repr(e)) 186 187 if not modems: 188 return None 189 elif len(modems) > 1: 190 raise ModemManager1ProxyError( 191 'Expected one modem object, found %d' % len(modems)) 192 193 modem_proxy = ModemProxy(self._bus, list(modems.keys())[0]) 194 # Check that this object is valid 195 try: 196 modem_proxy.modem.GetAll(mm1_constants.I_MODEM, 197 dbus_interface=mm1_constants.I_PROPERTIES) 198 return modem_proxy 199 except dbus.exceptions.DBusException as e: 200 if _is_unknown_dbus_binding_exception(e): 201 return None 202 raise ModemManager1ProxyError( 203 'Failed to obtain dbus object for the modem. DBus error: %s' % 204 repr(e)) 205 206 def wait_for_modem(self, timeout_seconds): 207 """ 208 Wait for the modem to appear. 209 210 @param timeout_seconds: Number of seconds to wait for modem to appear. 211 @return a ModemProxy object. 212 @raise ModemManager1ProxyError if no modem is found within the timeout 213 or if more than one modem is found. NOTE: This method does not 214 wait for a second modem. The exception is raised if there is 215 more than one modem at the time of polling. 216 217 """ 218 return utils.poll_for_condition( 219 self.get_modem, 220 exception=ModemManager1ProxyError('No modem found'), 221 timeout=timeout_seconds) 222 223 224class ModemProxy(object): 225 """A wrapper around a DBus proxy for ModemManager1 modem object.""" 226 227 # Amount of time to wait for a state transition. 228 STATE_TRANSITION_WAIT_SECONDS = 60 229 230 def __init__(self, bus, path): 231 self._bus = bus 232 self._modem = self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path) 233 234 @property 235 def modem(self): 236 """@return the DBus modem object.""" 237 return self._modem 238 239 @property 240 def iface_modem(self): 241 """@return org.freedesktop.ModemManager1.Modem DBus interface.""" 242 return dbus.Interface(self._modem, mm1_constants.I_MODEM) 243 244 @property 245 def iface_simple_modem(self): 246 """@return org.freedesktop.ModemManager1.Simple DBus interface.""" 247 return dbus.Interface(self._modem, mm1_constants.I_MODEM_SIMPLE) 248 249 @property 250 def iface_gsm_modem(self): 251 """@return org.freedesktop.ModemManager1.Modem3gpp DBus interface.""" 252 return dbus.Interface(self._modem, mm1_constants.I_MODEM_3GPP) 253 254 @property 255 def iface_cdma_modem(self): 256 """@return org.freedesktop.ModemManager1.ModemCdma DBus interface.""" 257 return dbus.Interface(self._modem, mm1_constants.I_MODEM_CDMA) 258 259 @property 260 def iface_properties(self): 261 """@return org.freedesktop.DBus.Properties DBus interface.""" 262 return dbus.Interface(self._modem, dbus.PROPERTIES_IFACE) 263 264 def properties(self, iface): 265 """Return the properties associated with the specified interface. 266 267 @param iface: Name of interface to retrieve the properties from. 268 @return array of properties. 269 270 """ 271 return self.iface_properties.GetAll(iface) 272 273 def get_sim(self): 274 """ 275 Return the SIM proxy object associated with this modem. 276 277 @return SimProxy object or None if no SIM exists. 278 279 """ 280 sim_path = self.properties(mm1_constants.I_MODEM).get('Sim') 281 if not sim_path: 282 return None 283 return self.get_sim_at_path(sim_path) 284 285 def get_sim_at_path(self, sim_path): 286 """ 287 Return the SIM proxy object associated with the sim_path. 288 289 @return SimProxy object or None if no SIM exists. 290 291 """ 292 sim_proxy = SimProxy(self._bus, sim_path) 293 # Check that this object is valid 294 try: 295 sim_proxy.properties(mm1_constants.I_SIM) 296 return sim_proxy 297 except dbus.exceptions.DBusException as e: 298 if _is_unknown_dbus_binding_exception(e): 299 return None 300 raise ModemManager1ProxyError( 301 'Failed to obtain dbus object for the SIM. DBus error: %s' % 302 repr(e)) 303 304 def get_sim_slots(self): 305 """ 306 The list of SIM slots available in the system, including the SIM object 307 paths if the cards are present. If a given SIM slot at a given index 308 doesn't have a SIM card available, an empty object path will be given. 309 310 The length of this array of objects will be equal to the amount of 311 available SIM slots in the system, and the index in the array is the 312 slot index. 313 314 This list includes the SIM object considered as primary active SIM slot 315 (#org.freedesktop.ModemManager1.Modem.Sim) at index 316 #org.freedesktop.ModemManager1.Modem.ActiveSimSlot. 317 318 @return list of SimSlot paths 319 320 """ 321 return self.properties(mm1_constants.I_MODEM).get('SimSlots') 322 323 def get_primary_sim_slot(self): 324 """ 325 The index of the primary active SIM slot in the 326 #org.freedesktop.ModemManager1.Modem.SimSlots array, given in the [1,N] 327 range. 328 329 If multiple SIM slots aren't supported, this property will report None 330 331 In a Multi SIM Single Standby setup, this index identifies the only SIM 332 that is currently active. All the remaining slots will be inactive. 333 334 In a Multi SIM Multi Standby setup, this index identifies the active SIM 335 that is considered primary, i.e. the one that will be used when a data 336 connection is setup. 337 338 @return current primary slot index 339 340 """ 341 return self.properties(mm1_constants.I_MODEM).get('PrimarySimSlot') 342 343 def set_primary_slot(self, sim_slot): 344 """ 345 Selects which SIM slot to be considered as primary, on devices that 346 expose multiple slots in the #org.freedesktop.ModemManager1.Modem 347 :SimSlots property. 348 349 When the switch happens the modem may require a full device reprobe, 350 so the modem object in DBus will get removed, and recreated once the 351 selected SIM slot is in use. 352 353 There is no limitation on which SIM slot to select, so the user may 354 also set as primary a slot that doesn't currently have any valid SIM 355 card inserted. 356 357 @param: sim_slot: SIM slot number to set as primary. 358 @return: success or raise error 359 360 """ 361 self.iface_modem.SetPrimarySimSlot(dbus.UInt32(sim_slot)) 362 363 def wait_for_states(self, states, 364 timeout_seconds=STATE_TRANSITION_WAIT_SECONDS): 365 """ 366 Wait for the modem to transition to a state in |states|. 367 368 This method does not support transitory states (eg. enabling, 369 disabling, connecting, disconnecting, etc). 370 371 @param states: List of states the modem can transition to. 372 @param timeout_seconds: Max number of seconds to wait. 373 @raise ModemManager1ProxyError if the modem does not transition to 374 one of the accepted states. 375 376 """ 377 for state in states: 378 if state in [mm1_constants.MM_MODEM_STATE_INITIALIZING, 379 mm1_constants.MM_MODEM_STATE_DISABLING, 380 mm1_constants.MM_MODEM_STATE_ENABLING, 381 mm1_constants.MM_MODEM_STATE_SEARCHING, 382 mm1_constants.MM_MODEM_STATE_DISCONNECTING, 383 mm1_constants.MM_MODEM_STATE_CONNECTING]: 384 raise ModemManager1ProxyError( 385 'wait_for_states() does not support transitory states.') 386 387 utils.poll_for_condition( 388 lambda: self.properties(mm1_constants.I_MODEM)[ 389 mm1_constants.MM_MODEM_PROPERTY_NAME_STATE] in states, 390 exception=ModemManager1ProxyError( 391 'Timed out waiting for modem to enter one of these ' 392 'states: %s, current state=%s' % 393 (states, 394 self.properties(mm1_constants.I_MODEM)[ 395 mm1_constants.MM_MODEM_PROPERTY_NAME_STATE])), 396 timeout=timeout_seconds) 397 398class SimProxy(object): 399 """A wrapper around a DBus proxy for ModemManager1 SIM object.""" 400 401 def __init__(self, bus, path): 402 self._bus = bus 403 self._sim = self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path) 404 405 @property 406 def sim(self): 407 """@return the DBus SIM object.""" 408 return self._sim 409 410 @property 411 def iface_properties(self): 412 """@return org.freedesktop.DBus.Properties DBus interface.""" 413 return dbus.Interface(self._sim, dbus.PROPERTIES_IFACE) 414 415 @property 416 def iface_sim(self): 417 """@return org.freedesktop.ModemManager1.Sim DBus interface.""" 418 return dbus.Interface(self._sim, mm1_constants.I_SIM) 419 420 def properties(self, iface=mm1_constants.I_SIM): 421 """Return the properties associated with the specified interface. 422 423 @param iface: Name of interface to retrieve the properties from. 424 @return array of properties. 425 426 """ 427 return self.iface_properties.GetAll(iface) 428