xref: /aosp_15_r20/external/pigweed/pw_rpc/py/pw_rpc/callback_client/errors.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Exceptions for RPC-related errors."""
15
16
17from pw_status import Status
18
19from pw_rpc.client import PendingRpc
20
21
22class RpcTimeout(Exception):
23    def __init__(self, rpc: PendingRpc, timeout: float | None):
24        super().__init__(
25            f'No response received for {rpc.method} after {timeout} s'
26        )
27        self.rpc = rpc
28        self.timeout = timeout
29
30
31class RpcError(Exception):
32    def __init__(self, rpc: PendingRpc, status: Status):
33        """Raised when there is an RPC-layer error."""
34        if status is Status.NOT_FOUND:
35            msg = ': the RPC server does not support this RPC'
36        elif status is Status.DATA_LOSS:
37            msg = ': an error occurred while decoding the RPC payload'
38        else:
39            msg = ''
40
41        super().__init__(f'{rpc.method} failed with error {status}{msg}')
42        self.rpc = rpc
43        self.status = status
44