1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import logging 16import threading 17 18import grpc 19from grpc_testing import _common 20 21logging.basicConfig() 22_LOGGER = logging.getLogger(__name__) 23 24 25class Rpc(object): 26 def __init__(self, handler, invocation_metadata): 27 self._condition = threading.Condition() 28 self._handler = handler 29 self._invocation_metadata = invocation_metadata 30 self._initial_metadata_sent = False 31 self._pending_trailing_metadata = None 32 self._pending_code = None 33 self._pending_details = None 34 self._callbacks = [] 35 self._active = True 36 self._rpc_errors = [] 37 38 def _ensure_initial_metadata_sent(self): 39 if not self._initial_metadata_sent: 40 self._handler.send_initial_metadata(_common.FUSSED_EMPTY_METADATA) 41 self._initial_metadata_sent = True 42 43 def _call_back(self): 44 callbacks = tuple(self._callbacks) 45 self._callbacks = None 46 47 def call_back(): 48 for callback in callbacks: 49 try: 50 callback() 51 except Exception: # pylint: disable=broad-except 52 _LOGGER.exception("Exception calling server-side callback!") 53 54 callback_calling_thread = threading.Thread(target=call_back) 55 callback_calling_thread.start() 56 57 def _terminate(self, trailing_metadata, code, details): 58 if self._active: 59 self._active = False 60 self._handler.send_termination(trailing_metadata, code, details) 61 self._call_back() 62 self._condition.notify_all() 63 64 def _complete(self): 65 if self._pending_trailing_metadata is None: 66 trailing_metadata = _common.FUSSED_EMPTY_METADATA 67 else: 68 trailing_metadata = self._pending_trailing_metadata 69 if self._pending_code is None: 70 code = grpc.StatusCode.OK 71 else: 72 code = self._pending_code 73 details = "" if self._pending_details is None else self._pending_details 74 self._terminate(trailing_metadata, code, details) 75 76 def _abort(self, code, details): 77 self._terminate(_common.FUSSED_EMPTY_METADATA, code, details) 78 79 def add_rpc_error(self, rpc_error): 80 with self._condition: 81 self._rpc_errors.append(rpc_error) 82 83 def application_cancel(self): 84 with self._condition: 85 self._abort( 86 grpc.StatusCode.CANCELLED, 87 "Cancelled by server-side application!", 88 ) 89 90 def application_exception_abort(self, exception): 91 with self._condition: 92 if exception not in self._rpc_errors: 93 _LOGGER.exception("Exception calling application!") 94 self._abort( 95 grpc.StatusCode.UNKNOWN, 96 "Exception calling application: {}".format(exception), 97 ) 98 99 def extrinsic_abort(self): 100 with self._condition: 101 if self._active: 102 self._active = False 103 self._call_back() 104 self._condition.notify_all() 105 106 def unary_response_complete(self, response): 107 with self._condition: 108 self._ensure_initial_metadata_sent() 109 self._handler.add_response(response) 110 self._complete() 111 112 def stream_response(self, response): 113 with self._condition: 114 self._ensure_initial_metadata_sent() 115 self._handler.add_response(response) 116 117 def stream_response_complete(self): 118 with self._condition: 119 self._ensure_initial_metadata_sent() 120 self._complete() 121 122 def send_initial_metadata(self, initial_metadata): 123 with self._condition: 124 if self._initial_metadata_sent: 125 return False 126 else: 127 self._handler.send_initial_metadata(initial_metadata) 128 self._initial_metadata_sent = True 129 return True 130 131 def is_active(self): 132 with self._condition: 133 return self._active 134 135 def add_callback(self, callback): 136 with self._condition: 137 if self._callbacks is None: 138 return False 139 else: 140 self._callbacks.append(callback) 141 return True 142 143 def invocation_metadata(self): 144 with self._condition: 145 return self._invocation_metadata 146 147 def set_trailing_metadata(self, trailing_metadata): 148 with self._condition: 149 self._pending_trailing_metadata = trailing_metadata 150 151 def set_code(self, code): 152 with self._condition: 153 self._pending_code = code 154 155 def set_details(self, details): 156 with self._condition: 157 self._pending_details = details 158