1# Lint as: python2, python3 2# Copyright 2021 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Client class to access the Floss manager interface.""" 6 7from __future__ import absolute_import 8from __future__ import division 9from __future__ import print_function 10 11import math 12import random 13 14from autotest_lib.client.cros.bluetooth.floss.observer_base import ObserverBase 15from autotest_lib.client.cros.bluetooth.floss.utils import glib_call, glib_callback 16 17 18class ManagerCallbacks: 19 """Callbacks for the Manager Interface. 20 21 Implement this to observe these callbacks when exporting callbacks via 22 register_callback. 23 """ 24 def on_hci_device_changed(self, hci, present): 25 """Hci device presence is updated. 26 27 @param hci: Hci interface number. 28 @param present: Whether this hci interface is appearing or disappearing. 29 """ 30 pass 31 32 def on_hci_enabled_changed(self, hci, enabled): 33 """Hci device is being enabled or disabled. 34 35 @param hci: Hci interface number. 36 @param enabled: Whether this hci interface is being enabled or disabled. 37 """ 38 pass 39 40 41class FlossManagerClient(ManagerCallbacks): 42 """ Handles method calls to and callbacks from the Manager interface.""" 43 44 MGR_SERVICE = 'org.chromium.bluetooth.Manager' 45 MGR_INTERFACE = 'org.chromium.bluetooth.Manager' 46 MGR_OBJECT = '/org/chromium/bluetooth/Manager' 47 48 # Exported callback interface and objects 49 CB_EXPORTED_INTF = 'org.chromium.bluetooth.ManagerCallbacks' 50 CB_EXPORTED_OBJ = '/org/chromium/bluetooth/test_manager_client{}' 51 52 class AdaptersNotParseable(Exception): 53 """An entry in the result of GetAvailableAdapters was not parseable.""" 54 pass 55 56 class ExportedManagerCallbacks(ObserverBase): 57 """ 58 <node> 59 <interface name="org.chromium.bluetooth.ManagerCallbacks"> 60 <method name="OnHciDeviceChanged"> 61 <arg type="i" name="hci" direction="in" /> 62 <arg type="b" name="present" direction="in" /> 63 </method> 64 <method name="OnHciEnabledChanged"> 65 <arg type="i" name="hci" direction="in" /> 66 <arg type="b" name="enabled" direction="in" /> 67 </method> 68 </interface> 69 </node> 70 """ 71 def __init__(self): 72 """Construct exported callbacks object. 73 """ 74 ObserverBase.__init__(self) 75 76 def OnHciDeviceChanged(self, hci, present): 77 """Handle device presence callbacks.""" 78 for observer in self.observers.values(): 79 observer.on_hci_device_changed(hci, present) 80 81 def OnHciEnabledChanged(self, hci, enabled): 82 """Handle device enabled callbacks.""" 83 for observer in self.observers.values(): 84 observer.on_hci_enabled_changed(hci, enabled) 85 86 def __init__(self, bus): 87 """ Construct the client. 88 89 @param bus: DBus bus over which we'll establish connections. 90 """ 91 self.bus = bus 92 93 # We don't register callbacks by default. The client owner must call 94 # register_callbacks to do so. 95 self.callbacks = None 96 97 # Initialize hci devices and their power states 98 self.adapters = {} 99 100 def __del__(self): 101 """Destructor""" 102 del self.callbacks 103 104 @glib_call(False) 105 def has_proxy(self): 106 """Checks whether manager proxy can be acquired.""" 107 return bool(self.proxy()) 108 109 def proxy(self): 110 """Gets proxy object to manager interface for method calls.""" 111 return self.bus.get(self.MGR_SERVICE, 112 self.MGR_OBJECT)[self.MGR_INTERFACE] 113 114 @glib_call(False) 115 def register_callbacks(self): 116 """Registers manager callbacks for this client if one doesn't already exist. 117 """ 118 # Callbacks already registered 119 if self.callbacks: 120 return True 121 122 # Generate a random number between 1-1000 123 rnumber = math.floor(random.random() * 1000 + 1) 124 125 # Create and publish callbacks 126 self.callbacks = self.ExportedManagerCallbacks() 127 self.callbacks.add_observer('manager_client', self) 128 objpath = self.CB_EXPORTED_OBJ.format(rnumber) 129 self.bus.register_object(objpath, self.callbacks, None) 130 131 # Register published callbacks with manager daemon 132 self.proxy().RegisterCallback(objpath) 133 134 return True 135 136 @glib_callback() 137 def on_hci_device_changed(self, hci, present): 138 """Handle device presence change.""" 139 if present: 140 self.adapters[hci] = self.adapters.get(hci, False) 141 elif hci in self.adapters: 142 del self.adapters[hci] 143 144 @glib_callback() 145 def on_hci_enabled_changed(self, hci, enabled): 146 """Handle device enabled change.""" 147 self.adapters[hci] = enabled 148 149 def get_default_adapter(self): 150 """Get the default adapter in use by the manager.""" 151 # TODO(abps): The default adapter is hci0 until we support multiple 152 # adapters. 153 return 0 154 155 def has_default_adapter(self): 156 """Checks whether the default adapter exists on this system.""" 157 return self.get_default_adapter() in self.adapters 158 159 @glib_call() 160 def start(self, hci): 161 """Start a specific adapter.""" 162 self.proxy().Start(hci) 163 164 @glib_call() 165 def stop(self, hci): 166 """Stop a specific adapter.""" 167 self.proxy().Stop(hci) 168 169 @glib_call(False) 170 def get_adapter_enabled(self, hci): 171 """Checks whether a specific adapter is enabled (i.e. started).""" 172 return bool(self.proxy().GetAdapterEnabled(hci)) 173 174 @glib_call(False) 175 def get_floss_enabled(self): 176 """Gets whether Floss is enabled.""" 177 return bool(self.proxy().GetFlossEnabled()) 178 179 @glib_call() 180 def set_floss_enabled(self, enabled): 181 self.proxy().SetFlossEnabled(enabled) 182 183 @glib_call([]) 184 def get_available_adapters(self): 185 """Gets a list of currently available adapters and if they are enabled. 186 """ 187 all_adapters = [] 188 dbus_result = self.proxy().GetAvailableAdapters() 189 190 for d in dbus_result: 191 if 'hci_interface' in d and 'enabled' in d: 192 all_adapters.append( 193 (int(d['hci_interface']), bool(d['enabled']))) 194 else: 195 raise FlossManagerClient.AdaptersNotParseable( 196 'Could not parse: {}', str(d)) 197 198 # This function call overwrites any existing cached values of 199 # self.adapters that we may have gotten from observers. 200 self.adapters = {} 201 for (hci, enabled) in all_adapters: 202 self.adapters[hci] = enabled 203 204 return all_adapters 205