1# Copyright 2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""L2CAP grpc interface.""" 15 16import asyncio 17import logging 18import os 19import socket as socket_module 20 21from floss.pandora.floss import floss_enums 22from floss.pandora.floss import socket_manager 23from floss.pandora.floss import utils 24from floss.pandora.server import bluetooth as bluetooth_module 25from google.protobuf import empty_pb2 26import grpc 27from pandora_experimental import l2cap_grpc_aio 28from pandora_experimental import l2cap_pb2 29 30 31class L2CAPService(l2cap_grpc_aio.L2CAPServicer): 32 """Service to trigger Bluetooth L2CAP procedures. 33 34 This class implements the Pandora bluetooth test interfaces, 35 where the meta class definition is automatically generated by the protobuf. 36 The interface definition can be found in: 37 https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/l2cap.proto 38 """ 39 40 # Size of the buffer for data transactions. 41 BUFFER_SIZE = 512 42 43 def __init__(self, bluetooth: bluetooth_module.Bluetooth): 44 self.bluetooth = bluetooth 45 46 # key = connection_value, val = (socket_id, [stream]) 47 self.incoming_connections = dict() 48 49 # key = connection_value, val = current_index 50 # In L2CAP/COS/ECFC/BV-03-C an incoming connection_value could be associated with multiple streams and the 51 # streams would be used in an interleave manner. This dict records the current index and would be updated each 52 # time SendData and ReceiveData are called. 53 self.interleave_connection_current_index = dict() 54 55 # key = connection_value, val = stream 56 self.outgoing_connections = dict() 57 58 async def ListenL2CAPChannel(self, request: l2cap_pb2.ListenL2CAPChannelRequest, 59 context: grpc.ServicerContext) -> l2cap_pb2.ListenL2CAPChannelResponse: 60 61 class ListenL2CAPObserver(socket_manager.SocketManagerCallbacks): 62 """Observer to observe listening on the L2CAP channel.""" 63 64 def __init__(self, task): 65 self.task = task 66 67 @utils.glib_callback() 68 def on_incoming_socket_ready(self, socket, status): 69 if not socket or 'id' not in socket: 70 return 71 72 socket_id = socket['id'] 73 if socket_id != self.task['socket_id']: 74 return 75 76 if status is None or floss_enums.BtStatus(status) != floss_enums.BtStatus.SUCCESS: 77 logging.error('Failed to listen on the L2CAP channel with socket_id: %s. Status: %s', socket_id, 78 status) 79 80 future = self.task['listen_l2cap_channel'] 81 future.get_loop().call_soon_threadsafe(future.set_result, (status, socket)) 82 83 try: 84 if not request.secure: 85 channel_type = 'insecure' 86 socket_result = self.bluetooth.listen_using_insecure_l2cap_channel() 87 else: 88 channel_type = 'secure' 89 socket_result = self.bluetooth.listen_using_l2cap_channel() 90 91 if socket_result is None: 92 await context.abort(grpc.StatusCode.INTERNAL, 93 f'Failed to call listen_using_{channel_type}_l2cap_channel.') 94 95 socket_id = socket_result['id'] 96 l2cap_channel_listener = { 97 'listen_l2cap_channel': asyncio.get_running_loop().create_future(), 98 'socket_id': socket_id 99 } 100 observer = ListenL2CAPObserver(l2cap_channel_listener) 101 name = utils.create_observer_name(observer) 102 self.bluetooth.socket_manager.register_callback_observer(name, observer) 103 status, socket = await asyncio.wait_for(l2cap_channel_listener['listen_l2cap_channel'], timeout=5) 104 if status != floss_enums.BtStatus.SUCCESS: 105 await context.abort( 106 grpc.StatusCode.INTERNAL, 107 f'Failed to listen on the L2CAP channel with socket_id: {socket_id}. Status: {status}') 108 109 self.incoming_connections[request.connection.cookie.value] = (socket['id'], []) 110 finally: 111 self.bluetooth.socket_manager.unregister_callback_observer(name, observer) 112 113 return empty_pb2.Empty() 114 115 async def CreateLECreditBasedChannel(self, request: l2cap_pb2.CreateLECreditBasedChannelRequest, 116 context: grpc.ServicerContext) -> l2cap_pb2.CreateLECreditBasedChannelResponse: 117 118 class CreateL2CAPObserver(socket_manager.SocketManagerCallbacks): 119 """Observer to observe the creation of the L2CAP channel.""" 120 121 def __init__(self, task): 122 self.task = task 123 124 @utils.glib_callback() 125 def on_outgoing_connection_result(self, connecting_id, result, socket, *, dbus_unix_fd_list=None): 126 if connecting_id != self.task['connecting_id']: 127 return 128 129 future = self.task['create_l2cap_channel'] 130 if result is None or floss_enums.BtStatus(result) != floss_enums.BtStatus.SUCCESS: 131 logging.error('Failed to create the L2CAP channel with connecting_id: %s. Status: %s', 132 connecting_id, result) 133 future.get_loop().call_soon_threadsafe(future.set_result, None) 134 return 135 136 if not socket: 137 future.get_loop().call_soon_threadsafe(future.set_result, None) 138 return 139 140 optional_fd = socket['optional_value']['fd'] 141 if not optional_fd: 142 future.get_loop().call_soon_threadsafe(future.set_result, None) 143 return 144 145 if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1: 146 logging.error('on_outgoing_connection_result: Empty fd list') 147 future.get_loop().call_soon_threadsafe(future.set_result, None) 148 return 149 150 fd_handle = optional_fd['optional_value'] 151 if fd_handle > dbus_unix_fd_list.get_length(): 152 logging.error('on_outgoing_connection_result: Invalid fd handle') 153 future.get_loop().call_soon_threadsafe(future.set_result, None) 154 return 155 156 fd = dbus_unix_fd_list.get(fd_handle) 157 fd_dup = os.dup(fd) 158 future.get_loop().call_soon_threadsafe(future.set_result, fd_dup) 159 160 connection_value = request.connection.cookie.value 161 address = utils.address_from(connection_value) 162 try: 163 if not request.secure: 164 channel_type = 'insecure' 165 socket_result = self.bluetooth.create_insecure_l2cap_channel(address, request.psm) 166 else: 167 channel_type = 'secure' 168 socket_result = self.bluetooth.create_l2cap_channel(address, request.psm) 169 170 if socket_result is None: 171 await context.abort(grpc.StatusCode.INTERNAL, f'Failed to call create_{channel_type}_l2cap_channel.') 172 173 connecting_id = socket_result['id'] 174 l2cap_channel_creation = { 175 'create_l2cap_channel': asyncio.get_running_loop().create_future(), 176 'connecting_id': connecting_id 177 } 178 observer = CreateL2CAPObserver(l2cap_channel_creation) 179 name = utils.create_observer_name(observer) 180 self.bluetooth.socket_manager.register_callback_observer(name, observer) 181 fd = await asyncio.wait_for(l2cap_channel_creation['create_l2cap_channel'], timeout=5) 182 if fd is None: 183 await context.abort(grpc.StatusCode.INTERNAL, 184 f'Failed to get the fd from L2CAP socket with connecting_id: {connecting_id}') 185 186 stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM) 187 self.outgoing_connections[connection_value] = stream 188 finally: 189 self.bluetooth.socket_manager.unregister_callback_observer(name, observer) 190 191 return empty_pb2.Empty() 192 193 async def AcceptL2CAPChannel(self, request: l2cap_pb2.AcceptL2CAPChannelRequest, 194 context: grpc.ServicerContext) -> l2cap_pb2.AcceptL2CAPChannelResponse: 195 196 class AcceptL2CAPObserver(socket_manager.SocketManagerCallbacks): 197 """Observer to observe the acceptance of the L2CAP channel.""" 198 199 def __init__(self, task): 200 self.task = task 201 202 @utils.glib_callback() 203 def on_handle_incoming_connection(self, listener_id, connection, *, dbus_unix_fd_list=None): 204 if listener_id != self.task['listener_id']: 205 return 206 207 future = self.task['accept_l2cap_channel'] 208 if not connection: 209 future.get_loop().call_soon_threadsafe(future.set_result, None) 210 return 211 212 optional_fd = connection['fd'] 213 if not optional_fd: 214 future.get_loop().call_soon_threadsafe(future.set_result, None) 215 return 216 217 if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1: 218 logging.error('on_handle_incoming_connection: Empty fd list') 219 future.get_loop().call_soon_threadsafe(future.set_result, None) 220 return 221 222 fd_handle = optional_fd['optional_value'] 223 if fd_handle > dbus_unix_fd_list.get_length(): 224 logging.error('on_handle_incoming_connection: Invalid fd handle') 225 future.get_loop().call_soon_threadsafe(future.set_result, None) 226 return 227 228 fd = dbus_unix_fd_list.get(fd_handle) 229 fd_dup = os.dup(fd) 230 future.get_loop().call_soon_threadsafe(future.set_result, fd_dup) 231 232 connection_value = request.connection.cookie.value 233 socket_tuple = self.incoming_connections.get(connection_value) 234 if socket_tuple is None: 235 return empty_pb2.Empty() 236 237 try: 238 socket_id, incoming_streams = socket_tuple 239 l2cap_channel_acceptance = { 240 'accept_l2cap_channel': asyncio.get_running_loop().create_future(), 241 'listener_id': socket_id 242 } 243 observer = AcceptL2CAPObserver(l2cap_channel_acceptance) 244 name = utils.create_observer_name(observer) 245 self.bluetooth.socket_manager.register_callback_observer(name, observer) 246 accept_socket_status = self.bluetooth.accept_socket(socket_id, timeout_ms=5) 247 if accept_socket_status != floss_enums.BtStatus.SUCCESS: 248 await context.abort( 249 grpc.StatusCode.INTERNAL, 250 f'Failed to accept the L2CAP socket with socket_id: {socket_id}. Status: {accept_socket_status}.') 251 252 fd = await asyncio.wait_for(l2cap_channel_acceptance['accept_l2cap_channel'], timeout=5) 253 if fd is None: 254 await context.abort(grpc.StatusCode.INTERNAL, 255 f'Failed to get the fd from L2CAP socket with socket_id: {socket_id}') 256 257 stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM) 258 incoming_streams.append(stream) 259 finally: 260 self.bluetooth.socket_manager.unregister_callback_observer(name, observer) 261 262 return empty_pb2.Empty() 263 264 def _interleave_get_incoming_stream(self, connection_value): 265 """Gets the incoming stream in an interleave manner.""" 266 socket_tuple = self.incoming_connections.get(connection_value) 267 if socket_tuple is None: 268 logging.error('Invalid connection_value: %s', connection_value) 269 return None 270 271 _, streams = socket_tuple 272 if not streams: 273 logging.error('No incoming stream available for connection_value: %s', connection_value) 274 return None 275 276 current_index = self.interleave_connection_current_index.get(connection_value, 0) 277 self.interleave_connection_current_index[connection_value] = (current_index + 1) % len(streams) 278 return streams[current_index] 279 280 async def SendData(self, request: l2cap_pb2.SendDataRequest, 281 context: grpc.ServicerContext) -> l2cap_pb2.SendDataResponse: 282 connection_value = request.connection.cookie.value 283 output_stream = self.outgoing_connections.get(connection_value) 284 if output_stream is None: 285 output_stream = self._interleave_get_incoming_stream(connection_value) 286 287 if output_stream: 288 try: 289 output_stream.send(request.data) 290 except Exception as e: 291 logging.error('Exception during writing to output stream: %s', e) 292 else: 293 logging.error('Output stream: %s not found for the connection_value: %s', output_stream, connection_value) 294 295 return empty_pb2.Empty() 296 297 async def ReceiveData(self, request: l2cap_pb2.ReceiveDataRequest, 298 context: grpc.ServicerContext) -> l2cap_pb2.ReceiveDataResponse: 299 connection_value = request.connection.cookie.value 300 input_stream = self.outgoing_connections.get(connection_value) 301 if input_stream is None: 302 input_stream = self._interleave_get_incoming_stream(connection_value) 303 304 if input_stream: 305 try: 306 data = input_stream.recv(self.BUFFER_SIZE) 307 if data: 308 return l2cap_pb2.ReceiveDataResponse(data=bytes(data)) 309 except Exception as e: 310 logging.error('Exception during reading from input stream: %s', e) 311 else: 312 logging.error('Input stream: %s not found for the connection_value: %s', input_stream, connection_value) 313 314 # Return an empty byte array. 315 return l2cap_pb2.ReceiveDataResponse(data=b'') 316