xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16cdef class CallbackFailureHandler:
17
18    def __cinit__(self,
19                  str core_function_name,
20                  object error_details,
21                  object exception_type):
22        """Handles failure by raising exception."""
23        self._core_function_name = core_function_name
24        self._error_details = error_details
25        self._exception_type = exception_type
26
27    cdef handle(self, object future):
28        future.set_exception(self._exception_type(
29            'Failed "%s": %s' % (self._core_function_name, self._error_details)
30        ))
31
32
33cdef class CallbackWrapper:
34
35    def __cinit__(self, object future, object loop, CallbackFailureHandler failure_handler):
36        self.context.functor.functor_run = self.functor_run
37        self.context.waiter = <cpython.PyObject*>future
38        self.context.loop = <cpython.PyObject*>loop
39        self.context.failure_handler = <cpython.PyObject*>failure_handler
40        self.context.callback_wrapper = <cpython.PyObject*>self
41        # NOTE(lidiz) Not using a list here, because this class is critical in
42        # data path. We should make it as efficient as possible.
43        self._reference_of_future = future
44        self._reference_of_failure_handler = failure_handler
45        # NOTE(lidiz) We need to ensure when Core invokes our callback, the
46        # callback function itself is not deallocated. Othersise, we will get
47        # a segfault. We can view this as Core holding a ref.
48        cpython.Py_INCREF(self)
49
50    @staticmethod
51    cdef void functor_run(
52            grpc_completion_queue_functor* functor,
53            int success) noexcept:
54        cdef CallbackContext *context = <CallbackContext *>functor
55        cdef object waiter = <object>context.waiter
56        if not waiter.cancelled():
57            if success == 0:
58                (<CallbackFailureHandler>context.failure_handler).handle(waiter)
59            else:
60                waiter.set_result(None)
61        cpython.Py_DECREF(<object>context.callback_wrapper)
62
63    cdef grpc_completion_queue_functor *c_functor(self):
64        return &self.context.functor
65
66
67cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler(
68    'grpc_completion_queue_shutdown',
69    'Unknown',
70    InternalError)
71
72
73class ExecuteBatchError(InternalError):
74    """Raised when execute batch returns a failure from Core."""
75
76
77async def execute_batch(GrpcCallWrapper grpc_call_wrapper,
78                               tuple operations,
79                               object loop):
80    """The callback version of start batch operations."""
81    cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(None, operations, None)
82    batch_operation_tag.prepare()
83
84    cdef object future = loop.create_future()
85    cdef CallbackWrapper wrapper = CallbackWrapper(
86        future,
87        loop,
88        CallbackFailureHandler('execute_batch', operations, ExecuteBatchError))
89    cdef grpc_call_error error = grpc_call_start_batch(
90        grpc_call_wrapper.call,
91        batch_operation_tag.c_ops,
92        batch_operation_tag.c_nops,
93        wrapper.c_functor(), NULL)
94
95    if error != GRPC_CALL_OK:
96        grpc_call_error_string = grpc_call_error_to_string(error).decode()
97        raise ExecuteBatchError("Failed grpc_call_start_batch: {} with grpc_call_error value: '{}'".format(error, grpc_call_error_string))
98
99    await future
100
101    cdef grpc_event c_event
102    # Tag.event must be called, otherwise messages won't be parsed from C
103    batch_operation_tag.event(c_event)
104
105
106cdef prepend_send_initial_metadata_op(tuple ops, tuple metadata):
107    # Eventually, this function should be the only function that produces
108    # SendInitialMetadataOperation. So we have more control over the flag.
109    return (SendInitialMetadataOperation(
110        metadata,
111        _EMPTY_FLAG
112    ),) + ops
113
114
115async def _receive_message(GrpcCallWrapper grpc_call_wrapper,
116                           object loop):
117    """Retrives parsed messages from Core.
118
119    The messages maybe already in Core's buffer, so there isn't a 1-to-1
120    mapping between this and the underlying "socket.read()". Also, eventually,
121    this function will end with an EOF, which reads empty message.
122    """
123    cdef ReceiveMessageOperation receive_op = ReceiveMessageOperation(_EMPTY_FLAG)
124    cdef tuple ops = (receive_op,)
125    try:
126        await execute_batch(grpc_call_wrapper, ops, loop)
127    except ExecuteBatchError as e:
128        # NOTE(lidiz) The receive message operation has two ways to indicate
129        # finish state : 1) returns empty message due to EOF; 2) fails inside
130        # the callback (e.g. cancelled).
131        #
132        # Since they all indicates finish, they are better be merged.
133        _LOGGER.debug('Failed to receive any message from Core')
134    # NOTE(lidiz) The returned message might be an empty bytes (aka. b'').
135    # Please explicitly check if it is None or falsey string object!
136    return receive_op.message()
137
138
139async def _send_message(GrpcCallWrapper grpc_call_wrapper,
140                        bytes message,
141                        Operation send_initial_metadata_op,
142                        int write_flag,
143                        object loop):
144    cdef SendMessageOperation op = SendMessageOperation(message, write_flag)
145    cdef tuple ops = (op,)
146    if send_initial_metadata_op is not None:
147        ops = (send_initial_metadata_op,) + ops
148    await execute_batch(grpc_call_wrapper, ops, loop)
149
150
151async def _send_initial_metadata(GrpcCallWrapper grpc_call_wrapper,
152                                 tuple metadata,
153                                 int flags,
154                                 object loop):
155    cdef SendInitialMetadataOperation op = SendInitialMetadataOperation(
156        metadata,
157        flags)
158    cdef tuple ops = (op,)
159    await execute_batch(grpc_call_wrapper, ops, loop)
160
161
162async def _receive_initial_metadata(GrpcCallWrapper grpc_call_wrapper,
163                                    object loop):
164    cdef ReceiveInitialMetadataOperation op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS)
165    cdef tuple ops = (op,)
166    await execute_batch(grpc_call_wrapper, ops, loop)
167    return op.initial_metadata()
168
169async def _send_error_status_from_server(GrpcCallWrapper grpc_call_wrapper,
170                                         grpc_status_code code,
171                                         str details,
172                                         tuple trailing_metadata,
173                                         Operation send_initial_metadata_op,
174                                         object loop):
175    assert code != StatusCode.ok, 'Expecting non-ok status code.'
176    cdef SendStatusFromServerOperation op = SendStatusFromServerOperation(
177        trailing_metadata,
178        code,
179        details,
180        _EMPTY_FLAGS,
181    )
182    cdef tuple ops = (op,)
183    if send_initial_metadata_op is not None:
184        ops = (send_initial_metadata_op,) + ops
185    await execute_batch(grpc_call_wrapper, ops, loop)
186