xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/unit/_abort_test.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests server context abort mechanism"""
15
16import collections
17import gc
18import logging
19import unittest
20import weakref
21
22import grpc
23
24from tests.unit import test_common
25from tests.unit.framework.common import test_constants
26
27_ABORT = "/test/abort"
28_ABORT_WITH_STATUS = "/test/AbortWithStatus"
29_INVALID_CODE = "/test/InvalidCode"
30
31_REQUEST = b"\x00\x00\x00"
32_RESPONSE = b"\x00\x00\x00"
33
34_ABORT_DETAILS = "Abandon ship!"
35_ABORT_METADATA = (("a-trailing-metadata", "42"),)
36
37
38class _Status(
39    collections.namedtuple("_Status", ("code", "details", "trailing_metadata")),
40    grpc.Status,
41):
42    pass
43
44
45class _Object(object):
46    pass
47
48
49do_not_leak_me = _Object()
50
51
52def abort_unary_unary(request, servicer_context):
53    this_should_not_be_leaked = do_not_leak_me
54    servicer_context.abort(
55        grpc.StatusCode.INTERNAL,
56        _ABORT_DETAILS,
57    )
58    raise Exception("This line should not be executed!")
59
60
61def abort_with_status_unary_unary(request, servicer_context):
62    servicer_context.abort_with_status(
63        _Status(
64            code=grpc.StatusCode.INTERNAL,
65            details=_ABORT_DETAILS,
66            trailing_metadata=_ABORT_METADATA,
67        )
68    )
69    raise Exception("This line should not be executed!")
70
71
72def invalid_code_unary_unary(request, servicer_context):
73    servicer_context.abort(
74        42,
75        _ABORT_DETAILS,
76    )
77
78
79class _GenericHandler(grpc.GenericRpcHandler):
80    def service(self, handler_call_details):
81        if handler_call_details.method == _ABORT:
82            return grpc.unary_unary_rpc_method_handler(abort_unary_unary)
83        elif handler_call_details.method == _ABORT_WITH_STATUS:
84            return grpc.unary_unary_rpc_method_handler(
85                abort_with_status_unary_unary
86            )
87        elif handler_call_details.method == _INVALID_CODE:
88            return grpc.stream_stream_rpc_method_handler(
89                invalid_code_unary_unary
90            )
91        else:
92            return None
93
94
95class AbortTest(unittest.TestCase):
96    def setUp(self):
97        self._server = test_common.test_server()
98        port = self._server.add_insecure_port("[::]:0")
99        self._server.add_generic_rpc_handlers((_GenericHandler(),))
100        self._server.start()
101
102        self._channel = grpc.insecure_channel("localhost:%d" % port)
103
104    def tearDown(self):
105        self._channel.close()
106        self._server.stop(0)
107
108    def test_abort(self):
109        with self.assertRaises(grpc.RpcError) as exception_context:
110            self._channel.unary_unary(
111                _ABORT,
112                _registered_method=True,
113            )(_REQUEST)
114        rpc_error = exception_context.exception
115
116        self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL)
117        self.assertEqual(rpc_error.details(), _ABORT_DETAILS)
118
119    # This test ensures that abort() does not store the raised exception, which
120    # on Python 3 (via the `__traceback__` attribute) holds a reference to
121    # all local vars. Storing the raised exception can prevent GC and stop the
122    # grpc_call from being unref'ed, even after server shutdown.
123    @unittest.skip("https://github.com/grpc/grpc/issues/17927")
124    def test_abort_does_not_leak_local_vars(self):
125        global do_not_leak_me  # pylint: disable=global-statement
126        weak_ref = weakref.ref(do_not_leak_me)
127
128        # Servicer will abort() after creating a local ref to do_not_leak_me.
129        with self.assertRaises(grpc.RpcError):
130            self._channel.unary_unary(
131                _ABORT,
132                _registered_method=True,
133            )(_REQUEST)
134
135        # Server may still have a stack frame reference to the exception even
136        # after client sees error, so ensure server has shutdown.
137        self._server.stop(None)
138        do_not_leak_me = None
139        self.assertIsNone(weak_ref())
140
141    def test_abort_with_status(self):
142        with self.assertRaises(grpc.RpcError) as exception_context:
143            self._channel.unary_unary(
144                _ABORT_WITH_STATUS,
145                _registered_method=True,
146            )(_REQUEST)
147        rpc_error = exception_context.exception
148
149        self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL)
150        self.assertEqual(rpc_error.details(), _ABORT_DETAILS)
151        self.assertEqual(rpc_error.trailing_metadata(), _ABORT_METADATA)
152
153    def test_invalid_code(self):
154        with self.assertRaises(grpc.RpcError) as exception_context:
155            self._channel.unary_unary(
156                _INVALID_CODE,
157                _registered_method=True,
158            )(_REQUEST)
159        rpc_error = exception_context.exception
160
161        self.assertEqual(rpc_error.code(), grpc.StatusCode.UNKNOWN)
162        self.assertEqual(rpc_error.details(), _ABORT_DETAILS)
163
164
165if __name__ == "__main__":
166    logging.basicConfig()
167    unittest.main(verbosity=2)
168