xref: /aosp_15_r20/external/pigweed/pw_rpc/py/pw_rpc/client_utils.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2024 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Utilities for using ``pw_rpc.Client``."""
15
16import logging
17from typing import Any, Iterable
18
19from pw_protobuf_compiler import python_protos
20import pw_rpc
21from pw_rpc import callback_client
22from pw_stream import stream_readers
23
24_LOG = logging.getLogger('pw_rpc.client_utils')
25_VERBOSE = logging.DEBUG - 1
26
27
28PathsModulesOrProtoLibrary = (
29    Iterable[python_protos.PathOrModule] | python_protos.Library
30)
31
32
33class RpcClient:
34    """An RPC client with configurable incoming data processing."""
35
36    def __init__(
37        self,
38        reader_and_executor: stream_readers.DataReaderAndExecutor,
39        paths_or_modules: PathsModulesOrProtoLibrary,
40        channels: Iterable[pw_rpc.Channel],
41        client_impl: pw_rpc.client.ClientImpl | None = None,
42    ):
43        """Creates an RPC client.
44
45        Args:
46          reader_and_executor: ``DataReaderAndExecutor`` instance.
47          paths_or_modules: paths to .proto files or proto modules.
48          channels: RPC channels to use for output.
49          client_impl: The RPC client implementation. Defaults to the callback
50            client implementation if not provided.
51        """
52        if isinstance(paths_or_modules, python_protos.Library):
53            self.protos = paths_or_modules
54        else:
55            self.protos = python_protos.Library.from_paths(paths_or_modules)
56
57        if client_impl is None:
58            client_impl = callback_client.Impl()
59
60        self.client = pw_rpc.Client.from_modules(
61            client_impl, channels, self.protos.modules()
62        )
63
64        # Start background thread that reads and processes RPC packets.
65        self._reader_and_executor = reader_and_executor
66        self._reader_and_executor.start()
67
68    def __enter__(self):
69        return self
70
71    def __exit__(self, *exc_info):
72        self.close()
73
74    def close(self) -> None:
75        self._reader_and_executor.stop()
76
77    def rpcs(self, channel_id: int | None = None) -> Any:
78        """Returns object for accessing services on the specified channel.
79
80        This skips some intermediate layers to make it simpler to invoke RPCs
81        from an ``HdlcRpcClient``. If only one channel is in use, the channel ID
82        is not necessary.
83        """
84        if channel_id is None:
85            return next(iter(self.client.channels())).rpcs
86
87        return self.client.channel(channel_id).rpcs
88
89    def handle_rpc_packet(self, packet: bytes) -> None:
90        if not self.client.process_packet(packet):
91            _LOG.error('Packet not handled by RPC client: %s', packet)
92
93
94class NoEncodingSingleChannelRpcClient(RpcClient):
95    """An RPC client without any frame encoding with a single channel output.
96
97    The caveat is that the provided read function must read entire frames.
98    """
99
100    def __init__(
101        self,
102        reader: stream_readers.CancellableReader,
103        paths_or_modules: PathsModulesOrProtoLibrary,
104        channel: pw_rpc.Channel,
105        client_impl: pw_rpc.client.ClientImpl | None = None,
106    ):
107        """Creates an RPC client over a single channel with no frame encoding.
108
109        Args:
110          reader: Readable object used to receive RPC packets.
111          paths_or_modules: paths to .proto files or proto modules.
112          channel: RPC channel to use for output.
113          client_impl: The RPC Client implementation. Defaults to the callback
114            client implementation if not provided.
115        """
116
117        def process_data(data: bytes):
118            yield data
119
120        def on_read_error(exc: Exception) -> None:
121            _LOG.error('data reader encountered an error', exc_info=exc)
122
123        reader_and_executor = stream_readers.DataReaderAndExecutor(
124            reader, on_read_error, process_data, self.handle_rpc_packet
125        )
126        super().__init__(
127            reader_and_executor, paths_or_modules, [channel], client_impl
128        )
129