xref: /aosp_15_r20/external/autotest/client/cros/bluetooth/floss/manager_client.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright 2021 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Client class to access the Floss manager interface."""
6
7from __future__ import absolute_import
8from __future__ import division
9from __future__ import print_function
10
11import math
12import random
13
14from autotest_lib.client.cros.bluetooth.floss.observer_base import ObserverBase
15from autotest_lib.client.cros.bluetooth.floss.utils import glib_call, glib_callback
16
17
18class ManagerCallbacks:
19    """Callbacks for the Manager Interface.
20
21    Implement this to observe these callbacks when exporting callbacks via
22    register_callback.
23    """
24    def on_hci_device_changed(self, hci, present):
25        """Hci device presence is updated.
26
27        @param hci: Hci interface number.
28        @param present: Whether this hci interface is appearing or disappearing.
29        """
30        pass
31
32    def on_hci_enabled_changed(self, hci, enabled):
33        """Hci device is being enabled or disabled.
34
35        @param hci: Hci interface number.
36        @param enabled: Whether this hci interface is being enabled or disabled.
37        """
38        pass
39
40
41class FlossManagerClient(ManagerCallbacks):
42    """ Handles method calls to and callbacks from the Manager interface."""
43
44    MGR_SERVICE = 'org.chromium.bluetooth.Manager'
45    MGR_INTERFACE = 'org.chromium.bluetooth.Manager'
46    MGR_OBJECT = '/org/chromium/bluetooth/Manager'
47
48    # Exported callback interface and objects
49    CB_EXPORTED_INTF = 'org.chromium.bluetooth.ManagerCallbacks'
50    CB_EXPORTED_OBJ = '/org/chromium/bluetooth/test_manager_client{}'
51
52    class AdaptersNotParseable(Exception):
53        """An entry in the result of GetAvailableAdapters was not parseable."""
54        pass
55
56    class ExportedManagerCallbacks(ObserverBase):
57        """
58        <node>
59            <interface name="org.chromium.bluetooth.ManagerCallbacks">
60                <method name="OnHciDeviceChanged">
61                    <arg type="i" name="hci" direction="in" />
62                    <arg type="b" name="present" direction="in" />
63                </method>
64                <method name="OnHciEnabledChanged">
65                    <arg type="i" name="hci" direction="in" />
66                    <arg type="b" name="enabled" direction="in" />
67                </method>
68            </interface>
69        </node>
70        """
71        def __init__(self):
72            """Construct exported callbacks object.
73            """
74            ObserverBase.__init__(self)
75
76        def OnHciDeviceChanged(self, hci, present):
77            """Handle device presence callbacks."""
78            for observer in self.observers.values():
79                observer.on_hci_device_changed(hci, present)
80
81        def OnHciEnabledChanged(self, hci, enabled):
82            """Handle device enabled callbacks."""
83            for observer in self.observers.values():
84                observer.on_hci_enabled_changed(hci, enabled)
85
86    def __init__(self, bus):
87        """ Construct the client.
88
89        @param bus: DBus bus over which we'll establish connections.
90        """
91        self.bus = bus
92
93        # We don't register callbacks by default. The client owner must call
94        # register_callbacks to do so.
95        self.callbacks = None
96
97        # Initialize hci devices and their power states
98        self.adapters = {}
99
100    def __del__(self):
101        """Destructor"""
102        del self.callbacks
103
104    @glib_call(False)
105    def has_proxy(self):
106        """Checks whether manager proxy can be acquired."""
107        return bool(self.proxy())
108
109    def proxy(self):
110        """Gets proxy object to manager interface for method calls."""
111        return self.bus.get(self.MGR_SERVICE,
112                            self.MGR_OBJECT)[self.MGR_INTERFACE]
113
114    @glib_call(False)
115    def register_callbacks(self):
116        """Registers manager callbacks for this client if one doesn't already exist.
117        """
118        # Callbacks already registered
119        if self.callbacks:
120            return True
121
122        # Generate a random number between 1-1000
123        rnumber = math.floor(random.random() * 1000 + 1)
124
125        # Create and publish callbacks
126        self.callbacks = self.ExportedManagerCallbacks()
127        self.callbacks.add_observer('manager_client', self)
128        objpath = self.CB_EXPORTED_OBJ.format(rnumber)
129        self.bus.register_object(objpath, self.callbacks, None)
130
131        # Register published callbacks with manager daemon
132        self.proxy().RegisterCallback(objpath)
133
134        return True
135
136    @glib_callback()
137    def on_hci_device_changed(self, hci, present):
138        """Handle device presence change."""
139        if present:
140            self.adapters[hci] = self.adapters.get(hci, False)
141        elif hci in self.adapters:
142            del self.adapters[hci]
143
144    @glib_callback()
145    def on_hci_enabled_changed(self, hci, enabled):
146        """Handle device enabled change."""
147        self.adapters[hci] = enabled
148
149    def get_default_adapter(self):
150        """Get the default adapter in use by the manager."""
151        # TODO(abps): The default adapter is hci0 until we support multiple
152        #             adapters.
153        return 0
154
155    def has_default_adapter(self):
156        """Checks whether the default adapter exists on this system."""
157        return self.get_default_adapter() in self.adapters
158
159    @glib_call()
160    def start(self, hci):
161        """Start a specific adapter."""
162        self.proxy().Start(hci)
163
164    @glib_call()
165    def stop(self, hci):
166        """Stop a specific adapter."""
167        self.proxy().Stop(hci)
168
169    @glib_call(False)
170    def get_adapter_enabled(self, hci):
171        """Checks whether a specific adapter is enabled (i.e. started)."""
172        return bool(self.proxy().GetAdapterEnabled(hci))
173
174    @glib_call(False)
175    def get_floss_enabled(self):
176        """Gets whether Floss is enabled."""
177        return bool(self.proxy().GetFlossEnabled())
178
179    @glib_call()
180    def set_floss_enabled(self, enabled):
181        self.proxy().SetFlossEnabled(enabled)
182
183    @glib_call([])
184    def get_available_adapters(self):
185        """Gets a list of currently available adapters and if they are enabled.
186        """
187        all_adapters = []
188        dbus_result = self.proxy().GetAvailableAdapters()
189
190        for d in dbus_result:
191            if 'hci_interface' in d and 'enabled' in d:
192                all_adapters.append(
193                        (int(d['hci_interface']), bool(d['enabled'])))
194            else:
195                raise FlossManagerClient.AdaptersNotParseable(
196                        'Could not parse: {}', str(d))
197
198        # This function call overwrites any existing cached values of
199        # self.adapters that we may have gotten from observers.
200        self.adapters = {}
201        for (hci, enabled) in all_adapters:
202            self.adapters[hci] = enabled
203
204        return all_adapters
205