1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""L2CAP grpc interface."""
15
16import asyncio
17import logging
18import os
19import socket as socket_module
20
21from floss.pandora.floss import floss_enums
22from floss.pandora.floss import socket_manager
23from floss.pandora.floss import utils
24from floss.pandora.server import bluetooth as bluetooth_module
25from google.protobuf import empty_pb2
26import grpc
27from pandora_experimental import l2cap_grpc_aio
28from pandora_experimental import l2cap_pb2
29
30
31class L2CAPService(l2cap_grpc_aio.L2CAPServicer):
32    """Service to trigger Bluetooth L2CAP procedures.
33
34    This class implements the Pandora bluetooth test interfaces,
35    where the meta class definition is automatically generated by the protobuf.
36    The interface definition can be found in:
37    https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/l2cap.proto
38    """
39
40    # Size of the buffer for data transactions.
41    BUFFER_SIZE = 512
42
43    def __init__(self, bluetooth: bluetooth_module.Bluetooth):
44        self.bluetooth = bluetooth
45
46        # key = connection_value, val = (socket_id, [stream])
47        self.incoming_connections = dict()
48
49        # key = connection_value, val = current_index
50        # In L2CAP/COS/ECFC/BV-03-C an incoming connection_value could be associated with multiple streams and the
51        # streams would be used in an interleave manner. This dict records the current index and would be updated each
52        # time SendData and ReceiveData are called.
53        self.interleave_connection_current_index = dict()
54
55        # key = connection_value, val = stream
56        self.outgoing_connections = dict()
57
58    async def ListenL2CAPChannel(self, request: l2cap_pb2.ListenL2CAPChannelRequest,
59                                 context: grpc.ServicerContext) -> l2cap_pb2.ListenL2CAPChannelResponse:
60
61        class ListenL2CAPObserver(socket_manager.SocketManagerCallbacks):
62            """Observer to observe listening on the L2CAP channel."""
63
64            def __init__(self, task):
65                self.task = task
66
67            @utils.glib_callback()
68            def on_incoming_socket_ready(self, socket, status):
69                if not socket or 'id' not in socket:
70                    return
71
72                socket_id = socket['id']
73                if socket_id != self.task['socket_id']:
74                    return
75
76                if status is None or floss_enums.BtStatus(status) != floss_enums.BtStatus.SUCCESS:
77                    logging.error('Failed to listen on the L2CAP channel with socket_id: %s. Status: %s', socket_id,
78                                  status)
79
80                future = self.task['listen_l2cap_channel']
81                future.get_loop().call_soon_threadsafe(future.set_result, (status, socket))
82
83        try:
84            if not request.secure:
85                channel_type = 'insecure'
86                socket_result = self.bluetooth.listen_using_insecure_l2cap_channel()
87            else:
88                channel_type = 'secure'
89                socket_result = self.bluetooth.listen_using_l2cap_channel()
90
91            if socket_result is None:
92                await context.abort(grpc.StatusCode.INTERNAL,
93                                    f'Failed to call listen_using_{channel_type}_l2cap_channel.')
94
95            socket_id = socket_result['id']
96            l2cap_channel_listener = {
97                'listen_l2cap_channel': asyncio.get_running_loop().create_future(),
98                'socket_id': socket_id
99            }
100            observer = ListenL2CAPObserver(l2cap_channel_listener)
101            name = utils.create_observer_name(observer)
102            self.bluetooth.socket_manager.register_callback_observer(name, observer)
103            status, socket = await asyncio.wait_for(l2cap_channel_listener['listen_l2cap_channel'], timeout=5)
104            if status != floss_enums.BtStatus.SUCCESS:
105                await context.abort(
106                    grpc.StatusCode.INTERNAL,
107                    f'Failed to listen on the L2CAP channel with socket_id: {socket_id}. Status: {status}')
108
109            self.incoming_connections[request.connection.cookie.value] = (socket['id'], [])
110        finally:
111            self.bluetooth.socket_manager.unregister_callback_observer(name, observer)
112
113        return empty_pb2.Empty()
114
115    async def CreateLECreditBasedChannel(self, request: l2cap_pb2.CreateLECreditBasedChannelRequest,
116                                         context: grpc.ServicerContext) -> l2cap_pb2.CreateLECreditBasedChannelResponse:
117
118        class CreateL2CAPObserver(socket_manager.SocketManagerCallbacks):
119            """Observer to observe the creation of the L2CAP channel."""
120
121            def __init__(self, task):
122                self.task = task
123
124            @utils.glib_callback()
125            def on_outgoing_connection_result(self, connecting_id, result, socket, *, dbus_unix_fd_list=None):
126                if connecting_id != self.task['connecting_id']:
127                    return
128
129                future = self.task['create_l2cap_channel']
130                if result is None or floss_enums.BtStatus(result) != floss_enums.BtStatus.SUCCESS:
131                    logging.error('Failed to create the L2CAP channel with connecting_id: %s. Status: %s',
132                                  connecting_id, result)
133                    future.get_loop().call_soon_threadsafe(future.set_result, None)
134                    return
135
136                if not socket:
137                    future.get_loop().call_soon_threadsafe(future.set_result, None)
138                    return
139
140                optional_fd = socket['optional_value']['fd']
141                if not optional_fd:
142                    future.get_loop().call_soon_threadsafe(future.set_result, None)
143                    return
144
145                if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1:
146                    logging.error('on_outgoing_connection_result: Empty fd list')
147                    future.get_loop().call_soon_threadsafe(future.set_result, None)
148                    return
149
150                fd_handle = optional_fd['optional_value']
151                if fd_handle > dbus_unix_fd_list.get_length():
152                    logging.error('on_outgoing_connection_result: Invalid fd handle')
153                    future.get_loop().call_soon_threadsafe(future.set_result, None)
154                    return
155
156                fd = dbus_unix_fd_list.get(fd_handle)
157                fd_dup = os.dup(fd)
158                future.get_loop().call_soon_threadsafe(future.set_result, fd_dup)
159
160        connection_value = request.connection.cookie.value
161        address = utils.address_from(connection_value)
162        try:
163            if not request.secure:
164                channel_type = 'insecure'
165                socket_result = self.bluetooth.create_insecure_l2cap_channel(address, request.psm)
166            else:
167                channel_type = 'secure'
168                socket_result = self.bluetooth.create_l2cap_channel(address, request.psm)
169
170            if socket_result is None:
171                await context.abort(grpc.StatusCode.INTERNAL, f'Failed to call create_{channel_type}_l2cap_channel.')
172
173            connecting_id = socket_result['id']
174            l2cap_channel_creation = {
175                'create_l2cap_channel': asyncio.get_running_loop().create_future(),
176                'connecting_id': connecting_id
177            }
178            observer = CreateL2CAPObserver(l2cap_channel_creation)
179            name = utils.create_observer_name(observer)
180            self.bluetooth.socket_manager.register_callback_observer(name, observer)
181            fd = await asyncio.wait_for(l2cap_channel_creation['create_l2cap_channel'], timeout=5)
182            if fd is None:
183                await context.abort(grpc.StatusCode.INTERNAL,
184                                    f'Failed to get the fd from L2CAP socket with connecting_id: {connecting_id}')
185
186            stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM)
187            self.outgoing_connections[connection_value] = stream
188        finally:
189            self.bluetooth.socket_manager.unregister_callback_observer(name, observer)
190
191        return empty_pb2.Empty()
192
193    async def AcceptL2CAPChannel(self, request: l2cap_pb2.AcceptL2CAPChannelRequest,
194                                 context: grpc.ServicerContext) -> l2cap_pb2.AcceptL2CAPChannelResponse:
195
196        class AcceptL2CAPObserver(socket_manager.SocketManagerCallbacks):
197            """Observer to observe the acceptance of the L2CAP channel."""
198
199            def __init__(self, task):
200                self.task = task
201
202            @utils.glib_callback()
203            def on_handle_incoming_connection(self, listener_id, connection, *, dbus_unix_fd_list=None):
204                if listener_id != self.task['listener_id']:
205                    return
206
207                future = self.task['accept_l2cap_channel']
208                if not connection:
209                    future.get_loop().call_soon_threadsafe(future.set_result, None)
210                    return
211
212                optional_fd = connection['fd']
213                if not optional_fd:
214                    future.get_loop().call_soon_threadsafe(future.set_result, None)
215                    return
216
217                if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1:
218                    logging.error('on_handle_incoming_connection: Empty fd list')
219                    future.get_loop().call_soon_threadsafe(future.set_result, None)
220                    return
221
222                fd_handle = optional_fd['optional_value']
223                if fd_handle > dbus_unix_fd_list.get_length():
224                    logging.error('on_handle_incoming_connection: Invalid fd handle')
225                    future.get_loop().call_soon_threadsafe(future.set_result, None)
226                    return
227
228                fd = dbus_unix_fd_list.get(fd_handle)
229                fd_dup = os.dup(fd)
230                future.get_loop().call_soon_threadsafe(future.set_result, fd_dup)
231
232        connection_value = request.connection.cookie.value
233        socket_tuple = self.incoming_connections.get(connection_value)
234        if socket_tuple is None:
235            return empty_pb2.Empty()
236
237        try:
238            socket_id, incoming_streams = socket_tuple
239            l2cap_channel_acceptance = {
240                'accept_l2cap_channel': asyncio.get_running_loop().create_future(),
241                'listener_id': socket_id
242            }
243            observer = AcceptL2CAPObserver(l2cap_channel_acceptance)
244            name = utils.create_observer_name(observer)
245            self.bluetooth.socket_manager.register_callback_observer(name, observer)
246            accept_socket_status = self.bluetooth.accept_socket(socket_id, timeout_ms=5)
247            if accept_socket_status != floss_enums.BtStatus.SUCCESS:
248                await context.abort(
249                    grpc.StatusCode.INTERNAL,
250                    f'Failed to accept the L2CAP socket with socket_id: {socket_id}. Status: {accept_socket_status}.')
251
252            fd = await asyncio.wait_for(l2cap_channel_acceptance['accept_l2cap_channel'], timeout=5)
253            if fd is None:
254                await context.abort(grpc.StatusCode.INTERNAL,
255                                    f'Failed to get the fd from L2CAP socket with socket_id: {socket_id}')
256
257            stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM)
258            incoming_streams.append(stream)
259        finally:
260            self.bluetooth.socket_manager.unregister_callback_observer(name, observer)
261
262        return empty_pb2.Empty()
263
264    def _interleave_get_incoming_stream(self, connection_value):
265        """Gets the incoming stream in an interleave manner."""
266        socket_tuple = self.incoming_connections.get(connection_value)
267        if socket_tuple is None:
268            logging.error('Invalid connection_value: %s', connection_value)
269            return None
270
271        _, streams = socket_tuple
272        if not streams:
273            logging.error('No incoming stream available for connection_value: %s', connection_value)
274            return None
275
276        current_index = self.interleave_connection_current_index.get(connection_value, 0)
277        self.interleave_connection_current_index[connection_value] = (current_index + 1) % len(streams)
278        return streams[current_index]
279
280    async def SendData(self, request: l2cap_pb2.SendDataRequest,
281                       context: grpc.ServicerContext) -> l2cap_pb2.SendDataResponse:
282        connection_value = request.connection.cookie.value
283        output_stream = self.outgoing_connections.get(connection_value)
284        if output_stream is None:
285            output_stream = self._interleave_get_incoming_stream(connection_value)
286
287        if output_stream:
288            try:
289                output_stream.send(request.data)
290            except Exception as e:
291                logging.error('Exception during writing to output stream: %s', e)
292        else:
293            logging.error('Output stream: %s not found for the connection_value: %s', output_stream, connection_value)
294
295        return empty_pb2.Empty()
296
297    async def ReceiveData(self, request: l2cap_pb2.ReceiveDataRequest,
298                          context: grpc.ServicerContext) -> l2cap_pb2.ReceiveDataResponse:
299        connection_value = request.connection.cookie.value
300        input_stream = self.outgoing_connections.get(connection_value)
301        if input_stream is None:
302            input_stream = self._interleave_get_incoming_stream(connection_value)
303
304        if input_stream:
305            try:
306                data = input_stream.recv(self.BUFFER_SIZE)
307                if data:
308                    return l2cap_pb2.ReceiveDataResponse(data=bytes(data))
309            except Exception as e:
310                logging.error('Exception during reading from input stream: %s', e)
311        else:
312            logging.error('Input stream: %s not found for the connection_value: %s', input_stream, connection_value)
313
314        # Return an empty byte array.
315        return l2cap_pb2.ReceiveDataResponse(data=b'')
316