1# Copyright 2019 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16cdef class CallbackFailureHandler: 17 18 def __cinit__(self, 19 str core_function_name, 20 object error_details, 21 object exception_type): 22 """Handles failure by raising exception.""" 23 self._core_function_name = core_function_name 24 self._error_details = error_details 25 self._exception_type = exception_type 26 27 cdef handle(self, object future): 28 future.set_exception(self._exception_type( 29 'Failed "%s": %s' % (self._core_function_name, self._error_details) 30 )) 31 32 33cdef class CallbackWrapper: 34 35 def __cinit__(self, object future, object loop, CallbackFailureHandler failure_handler): 36 self.context.functor.functor_run = self.functor_run 37 self.context.waiter = <cpython.PyObject*>future 38 self.context.loop = <cpython.PyObject*>loop 39 self.context.failure_handler = <cpython.PyObject*>failure_handler 40 self.context.callback_wrapper = <cpython.PyObject*>self 41 # NOTE(lidiz) Not using a list here, because this class is critical in 42 # data path. We should make it as efficient as possible. 43 self._reference_of_future = future 44 self._reference_of_failure_handler = failure_handler 45 # NOTE(lidiz) We need to ensure when Core invokes our callback, the 46 # callback function itself is not deallocated. Othersise, we will get 47 # a segfault. We can view this as Core holding a ref. 48 cpython.Py_INCREF(self) 49 50 @staticmethod 51 cdef void functor_run( 52 grpc_completion_queue_functor* functor, 53 int success) noexcept: 54 cdef CallbackContext *context = <CallbackContext *>functor 55 cdef object waiter = <object>context.waiter 56 if not waiter.cancelled(): 57 if success == 0: 58 (<CallbackFailureHandler>context.failure_handler).handle(waiter) 59 else: 60 waiter.set_result(None) 61 cpython.Py_DECREF(<object>context.callback_wrapper) 62 63 cdef grpc_completion_queue_functor *c_functor(self): 64 return &self.context.functor 65 66 67cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler( 68 'grpc_completion_queue_shutdown', 69 'Unknown', 70 InternalError) 71 72 73class ExecuteBatchError(InternalError): 74 """Raised when execute batch returns a failure from Core.""" 75 76 77async def execute_batch(GrpcCallWrapper grpc_call_wrapper, 78 tuple operations, 79 object loop): 80 """The callback version of start batch operations.""" 81 cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(None, operations, None) 82 batch_operation_tag.prepare() 83 84 cdef object future = loop.create_future() 85 cdef CallbackWrapper wrapper = CallbackWrapper( 86 future, 87 loop, 88 CallbackFailureHandler('execute_batch', operations, ExecuteBatchError)) 89 cdef grpc_call_error error = grpc_call_start_batch( 90 grpc_call_wrapper.call, 91 batch_operation_tag.c_ops, 92 batch_operation_tag.c_nops, 93 wrapper.c_functor(), NULL) 94 95 if error != GRPC_CALL_OK: 96 grpc_call_error_string = grpc_call_error_to_string(error).decode() 97 raise ExecuteBatchError("Failed grpc_call_start_batch: {} with grpc_call_error value: '{}'".format(error, grpc_call_error_string)) 98 99 await future 100 101 cdef grpc_event c_event 102 # Tag.event must be called, otherwise messages won't be parsed from C 103 batch_operation_tag.event(c_event) 104 105 106cdef prepend_send_initial_metadata_op(tuple ops, tuple metadata): 107 # Eventually, this function should be the only function that produces 108 # SendInitialMetadataOperation. So we have more control over the flag. 109 return (SendInitialMetadataOperation( 110 metadata, 111 _EMPTY_FLAG 112 ),) + ops 113 114 115async def _receive_message(GrpcCallWrapper grpc_call_wrapper, 116 object loop): 117 """Retrives parsed messages from Core. 118 119 The messages maybe already in Core's buffer, so there isn't a 1-to-1 120 mapping between this and the underlying "socket.read()". Also, eventually, 121 this function will end with an EOF, which reads empty message. 122 """ 123 cdef ReceiveMessageOperation receive_op = ReceiveMessageOperation(_EMPTY_FLAG) 124 cdef tuple ops = (receive_op,) 125 try: 126 await execute_batch(grpc_call_wrapper, ops, loop) 127 except ExecuteBatchError as e: 128 # NOTE(lidiz) The receive message operation has two ways to indicate 129 # finish state : 1) returns empty message due to EOF; 2) fails inside 130 # the callback (e.g. cancelled). 131 # 132 # Since they all indicates finish, they are better be merged. 133 _LOGGER.debug('Failed to receive any message from Core') 134 # NOTE(lidiz) The returned message might be an empty bytes (aka. b''). 135 # Please explicitly check if it is None or falsey string object! 136 return receive_op.message() 137 138 139async def _send_message(GrpcCallWrapper grpc_call_wrapper, 140 bytes message, 141 Operation send_initial_metadata_op, 142 int write_flag, 143 object loop): 144 cdef SendMessageOperation op = SendMessageOperation(message, write_flag) 145 cdef tuple ops = (op,) 146 if send_initial_metadata_op is not None: 147 ops = (send_initial_metadata_op,) + ops 148 await execute_batch(grpc_call_wrapper, ops, loop) 149 150 151async def _send_initial_metadata(GrpcCallWrapper grpc_call_wrapper, 152 tuple metadata, 153 int flags, 154 object loop): 155 cdef SendInitialMetadataOperation op = SendInitialMetadataOperation( 156 metadata, 157 flags) 158 cdef tuple ops = (op,) 159 await execute_batch(grpc_call_wrapper, ops, loop) 160 161 162async def _receive_initial_metadata(GrpcCallWrapper grpc_call_wrapper, 163 object loop): 164 cdef ReceiveInitialMetadataOperation op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS) 165 cdef tuple ops = (op,) 166 await execute_batch(grpc_call_wrapper, ops, loop) 167 return op.initial_metadata() 168 169async def _send_error_status_from_server(GrpcCallWrapper grpc_call_wrapper, 170 grpc_status_code code, 171 str details, 172 tuple trailing_metadata, 173 Operation send_initial_metadata_op, 174 object loop): 175 assert code != StatusCode.ok, 'Expecting non-ok status code.' 176 cdef SendStatusFromServerOperation op = SendStatusFromServerOperation( 177 trailing_metadata, 178 code, 179 details, 180 _EMPTY_FLAGS, 181 ) 182 cdef tuple ops = (op,) 183 if send_initial_metadata_op is not None: 184 ops = (send_initial_metadata_op,) + ops 185 await execute_batch(grpc_call_wrapper, ops, loop) 186