1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests server context abort mechanism""" 15 16import collections 17import gc 18import logging 19import unittest 20import weakref 21 22import grpc 23 24from tests.unit import test_common 25from tests.unit.framework.common import test_constants 26 27_ABORT = "/test/abort" 28_ABORT_WITH_STATUS = "/test/AbortWithStatus" 29_INVALID_CODE = "/test/InvalidCode" 30 31_REQUEST = b"\x00\x00\x00" 32_RESPONSE = b"\x00\x00\x00" 33 34_ABORT_DETAILS = "Abandon ship!" 35_ABORT_METADATA = (("a-trailing-metadata", "42"),) 36 37 38class _Status( 39 collections.namedtuple("_Status", ("code", "details", "trailing_metadata")), 40 grpc.Status, 41): 42 pass 43 44 45class _Object(object): 46 pass 47 48 49do_not_leak_me = _Object() 50 51 52def abort_unary_unary(request, servicer_context): 53 this_should_not_be_leaked = do_not_leak_me 54 servicer_context.abort( 55 grpc.StatusCode.INTERNAL, 56 _ABORT_DETAILS, 57 ) 58 raise Exception("This line should not be executed!") 59 60 61def abort_with_status_unary_unary(request, servicer_context): 62 servicer_context.abort_with_status( 63 _Status( 64 code=grpc.StatusCode.INTERNAL, 65 details=_ABORT_DETAILS, 66 trailing_metadata=_ABORT_METADATA, 67 ) 68 ) 69 raise Exception("This line should not be executed!") 70 71 72def invalid_code_unary_unary(request, servicer_context): 73 servicer_context.abort( 74 42, 75 _ABORT_DETAILS, 76 ) 77 78 79class _GenericHandler(grpc.GenericRpcHandler): 80 def service(self, handler_call_details): 81 if handler_call_details.method == _ABORT: 82 return grpc.unary_unary_rpc_method_handler(abort_unary_unary) 83 elif handler_call_details.method == _ABORT_WITH_STATUS: 84 return grpc.unary_unary_rpc_method_handler( 85 abort_with_status_unary_unary 86 ) 87 elif handler_call_details.method == _INVALID_CODE: 88 return grpc.stream_stream_rpc_method_handler( 89 invalid_code_unary_unary 90 ) 91 else: 92 return None 93 94 95class AbortTest(unittest.TestCase): 96 def setUp(self): 97 self._server = test_common.test_server() 98 port = self._server.add_insecure_port("[::]:0") 99 self._server.add_generic_rpc_handlers((_GenericHandler(),)) 100 self._server.start() 101 102 self._channel = grpc.insecure_channel("localhost:%d" % port) 103 104 def tearDown(self): 105 self._channel.close() 106 self._server.stop(0) 107 108 def test_abort(self): 109 with self.assertRaises(grpc.RpcError) as exception_context: 110 self._channel.unary_unary( 111 _ABORT, 112 _registered_method=True, 113 )(_REQUEST) 114 rpc_error = exception_context.exception 115 116 self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL) 117 self.assertEqual(rpc_error.details(), _ABORT_DETAILS) 118 119 # This test ensures that abort() does not store the raised exception, which 120 # on Python 3 (via the `__traceback__` attribute) holds a reference to 121 # all local vars. Storing the raised exception can prevent GC and stop the 122 # grpc_call from being unref'ed, even after server shutdown. 123 @unittest.skip("https://github.com/grpc/grpc/issues/17927") 124 def test_abort_does_not_leak_local_vars(self): 125 global do_not_leak_me # pylint: disable=global-statement 126 weak_ref = weakref.ref(do_not_leak_me) 127 128 # Servicer will abort() after creating a local ref to do_not_leak_me. 129 with self.assertRaises(grpc.RpcError): 130 self._channel.unary_unary( 131 _ABORT, 132 _registered_method=True, 133 )(_REQUEST) 134 135 # Server may still have a stack frame reference to the exception even 136 # after client sees error, so ensure server has shutdown. 137 self._server.stop(None) 138 do_not_leak_me = None 139 self.assertIsNone(weak_ref()) 140 141 def test_abort_with_status(self): 142 with self.assertRaises(grpc.RpcError) as exception_context: 143 self._channel.unary_unary( 144 _ABORT_WITH_STATUS, 145 _registered_method=True, 146 )(_REQUEST) 147 rpc_error = exception_context.exception 148 149 self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL) 150 self.assertEqual(rpc_error.details(), _ABORT_DETAILS) 151 self.assertEqual(rpc_error.trailing_metadata(), _ABORT_METADATA) 152 153 def test_invalid_code(self): 154 with self.assertRaises(grpc.RpcError) as exception_context: 155 self._channel.unary_unary( 156 _INVALID_CODE, 157 _registered_method=True, 158 )(_REQUEST) 159 rpc_error = exception_context.exception 160 161 self.assertEqual(rpc_error.code(), grpc.StatusCode.UNKNOWN) 162 self.assertEqual(rpc_error.details(), _ABORT_DETAILS) 163 164 165if __name__ == "__main__": 166 logging.basicConfig() 167 unittest.main(verbosity=2) 168