1# Copyright 2024 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Utilities for using ``pw_rpc.Client``.""" 15 16import logging 17from typing import Any, Iterable 18 19from pw_protobuf_compiler import python_protos 20import pw_rpc 21from pw_rpc import callback_client 22from pw_stream import stream_readers 23 24_LOG = logging.getLogger('pw_rpc.client_utils') 25_VERBOSE = logging.DEBUG - 1 26 27 28PathsModulesOrProtoLibrary = ( 29 Iterable[python_protos.PathOrModule] | python_protos.Library 30) 31 32 33class RpcClient: 34 """An RPC client with configurable incoming data processing.""" 35 36 def __init__( 37 self, 38 reader_and_executor: stream_readers.DataReaderAndExecutor, 39 paths_or_modules: PathsModulesOrProtoLibrary, 40 channels: Iterable[pw_rpc.Channel], 41 client_impl: pw_rpc.client.ClientImpl | None = None, 42 ): 43 """Creates an RPC client. 44 45 Args: 46 reader_and_executor: ``DataReaderAndExecutor`` instance. 47 paths_or_modules: paths to .proto files or proto modules. 48 channels: RPC channels to use for output. 49 client_impl: The RPC client implementation. Defaults to the callback 50 client implementation if not provided. 51 """ 52 if isinstance(paths_or_modules, python_protos.Library): 53 self.protos = paths_or_modules 54 else: 55 self.protos = python_protos.Library.from_paths(paths_or_modules) 56 57 if client_impl is None: 58 client_impl = callback_client.Impl() 59 60 self.client = pw_rpc.Client.from_modules( 61 client_impl, channels, self.protos.modules() 62 ) 63 64 # Start background thread that reads and processes RPC packets. 65 self._reader_and_executor = reader_and_executor 66 self._reader_and_executor.start() 67 68 def __enter__(self): 69 return self 70 71 def __exit__(self, *exc_info): 72 self.close() 73 74 def close(self) -> None: 75 self._reader_and_executor.stop() 76 77 def rpcs(self, channel_id: int | None = None) -> Any: 78 """Returns object for accessing services on the specified channel. 79 80 This skips some intermediate layers to make it simpler to invoke RPCs 81 from an ``HdlcRpcClient``. If only one channel is in use, the channel ID 82 is not necessary. 83 """ 84 if channel_id is None: 85 return next(iter(self.client.channels())).rpcs 86 87 return self.client.channel(channel_id).rpcs 88 89 def handle_rpc_packet(self, packet: bytes) -> None: 90 if not self.client.process_packet(packet): 91 _LOG.error('Packet not handled by RPC client: %s', packet) 92 93 94class NoEncodingSingleChannelRpcClient(RpcClient): 95 """An RPC client without any frame encoding with a single channel output. 96 97 The caveat is that the provided read function must read entire frames. 98 """ 99 100 def __init__( 101 self, 102 reader: stream_readers.CancellableReader, 103 paths_or_modules: PathsModulesOrProtoLibrary, 104 channel: pw_rpc.Channel, 105 client_impl: pw_rpc.client.ClientImpl | None = None, 106 ): 107 """Creates an RPC client over a single channel with no frame encoding. 108 109 Args: 110 reader: Readable object used to receive RPC packets. 111 paths_or_modules: paths to .proto files or proto modules. 112 channel: RPC channel to use for output. 113 client_impl: The RPC Client implementation. Defaults to the callback 114 client implementation if not provided. 115 """ 116 117 def process_data(data: bytes): 118 yield data 119 120 def on_read_error(exc: Exception) -> None: 121 _LOG.error('data reader encountered an error', exc_info=exc) 122 123 reader_and_executor = stream_readers.DataReaderAndExecutor( 124 reader, on_read_error, process_data, self.handle_rpc_packet 125 ) 126 super().__init__( 127 reader_and_executor, paths_or_modules, [channel], client_impl 128 ) 129