xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16_EMPTY_FLAGS = 0
17_EMPTY_MASK = 0
18_IMMUTABLE_EMPTY_METADATA = tuple()
19
20_UNKNOWN_CANCELLATION_DETAILS = 'RPC cancelled for unknown reason.'
21_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n'
22                           '\tstatus = {}\n'
23                           '\tdetails = "{}"\n'
24                           '>')
25
26_NON_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n'
27                               '\tstatus = {}\n'
28                               '\tdetails = "{}"\n'
29                               '\tdebug_error_string = "{}"\n'
30                               '>')
31
32
33cdef int _get_send_initial_metadata_flags(object wait_for_ready) except *:
34    cdef int flags = 0
35    # Wait-for-ready can be None, which means using default value in Core.
36    if wait_for_ready is not None:
37        flags |= InitialMetadataFlags.wait_for_ready_explicitly_set
38        if wait_for_ready:
39            flags |= InitialMetadataFlags.wait_for_ready
40
41    flags &= InitialMetadataFlags.used_mask
42    return flags
43
44
45cdef class _AioCall(GrpcCallWrapper):
46
47    def __cinit__(self, AioChannel channel, object deadline,
48                  bytes method, CallCredentials call_credentials, object wait_for_ready):
49        init_grpc_aio()
50        self.call = NULL
51        self._channel = channel
52        self._loop = channel.loop
53        self._references = []
54        self._status = None
55        self._initial_metadata = None
56        self._waiters_status = []
57        self._waiters_initial_metadata = []
58        self._done_callbacks = []
59        self._is_locally_cancelled = False
60        self._deadline = deadline
61        self._send_initial_metadata_flags = _get_send_initial_metadata_flags(wait_for_ready)
62        self._create_grpc_call(deadline, method, call_credentials)
63
64    def __dealloc__(self):
65        if self.call:
66            grpc_call_unref(self.call)
67        shutdown_grpc_aio()
68
69    def _repr(self) -> str:
70        """Assembles the RPC representation string."""
71        # This needs to be loaded at run time once everything
72        # has been loaded.
73        from grpc import _common
74
75        if not self.done():
76            return '<{} object>'.format(self.__class__.__name__)
77
78        if self._status.code() is StatusCode.ok:
79            return _OK_CALL_REPRESENTATION.format(
80                self.__class__.__name__,
81                _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[self._status.code()],
82                self._status.details())
83        else:
84            return _NON_OK_CALL_REPRESENTATION.format(
85                self.__class__.__name__,
86                self._status.details(),
87                _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[self._status.code()],
88                self._status.debug_error_string())
89
90    def __repr__(self) -> str:
91        return self._repr()
92
93    def __str__(self) -> str:
94        return self._repr()
95
96    cdef void _create_grpc_call(self,
97                                object deadline,
98                                bytes method,
99                                CallCredentials credentials) except *:
100        """Creates the corresponding Core object for this RPC.
101
102        For unary calls, the grpc_call lives shortly and can be destroyed after
103        invoke start_batch. However, if either side is streaming, the grpc_call
104        life span will be longer than one function. So, it would better save it
105        as an instance variable than a stack variable, which reflects its
106        nature in Core.
107        """
108        cdef grpc_slice method_slice
109        cdef gpr_timespec c_deadline = _timespec_from_time(deadline)
110        cdef grpc_call_error set_credentials_error
111
112        method_slice = grpc_slice_from_copied_buffer(
113            <const char *> method,
114            <size_t> len(method)
115        )
116        self.call = grpc_channel_create_call(
117            self._channel.channel,
118            NULL,
119            _EMPTY_MASK,
120            global_completion_queue(),
121            method_slice,
122            NULL,
123            c_deadline,
124            NULL
125        )
126
127        if credentials is not None:
128            set_credentials_error = grpc_call_set_credentials(self.call, credentials.c())
129            if set_credentials_error != GRPC_CALL_OK:
130                raise InternalError("Credentials couldn't have been set: {0}".format(set_credentials_error))
131
132        grpc_slice_unref(method_slice)
133
134    cdef void _set_status(self, AioRpcStatus status) except *:
135        cdef list waiters
136
137        # No more waiters should be expected since status has been set.
138        self._status = status
139
140        if self._initial_metadata is None:
141            self._set_initial_metadata(_IMMUTABLE_EMPTY_METADATA)
142
143        for waiter in self._waiters_status:
144            if not waiter.done():
145                waiter.set_result(None)
146        self._waiters_status = []
147
148        for callback in self._done_callbacks:
149            callback()
150
151    cdef void _set_initial_metadata(self, tuple initial_metadata) except *:
152        if self._initial_metadata is not None:
153            # Some gRPC calls might end before the initial metadata arrived in
154            # the Call object. That causes this method to be invoked twice: 1.
155            # filled with an empty metadata; 2. updated with the actual user
156            # provided metadata.
157            return
158
159        cdef list waiters
160
161        # No more waiters should be expected since initial metadata has been
162        # set.
163        self._initial_metadata = initial_metadata
164
165        for waiter in self._waiters_initial_metadata:
166            if not waiter.done():
167                waiter.set_result(None)
168        self._waiters_initial_metadata = []
169
170    def add_done_callback(self, callback):
171        if self.done():
172            callback()
173        else:
174            self._done_callbacks.append(callback)
175
176    def time_remaining(self):
177        if self._deadline is None:
178            return None
179        else:
180            return max(0, self._deadline - time.time())
181
182    def cancel(self, str details):
183        """Cancels the RPC in Core with given RPC status.
184
185        Above abstractions must invoke this method to set Core objects into
186        proper state.
187        """
188        self._is_locally_cancelled = True
189
190        cdef object details_bytes
191        cdef char *c_details
192        cdef grpc_call_error error
193
194        self._set_status(AioRpcStatus(
195            StatusCode.cancelled,
196            details,
197            None,
198            None,
199        ))
200
201        details_bytes = str_to_bytes(details)
202        self._references.append(details_bytes)
203        c_details = <char *>details_bytes
204        # By implementation, grpc_call_cancel_with_status always return OK
205        error = grpc_call_cancel_with_status(
206            self.call,
207            StatusCode.cancelled,
208            c_details,
209            NULL,
210        )
211        assert error == GRPC_CALL_OK
212
213    def done(self):
214        """Returns if the RPC call has finished.
215
216        Checks if the status has been provided, either
217        because the RPC finished or because was cancelled..
218
219        Returns:
220            True if the RPC can be considered finished.
221        """
222        return self._status is not None
223
224    def cancelled(self):
225        """Returns if the RPC was cancelled.
226
227        Returns:
228            True if the RPC was cancelled.
229        """
230        if not self.done():
231            return False
232
233        return self._status.code() == StatusCode.cancelled
234
235    async def status(self):
236        """Returns the status of the RPC call.
237
238        It returns the finshed status of the RPC. If the RPC
239        has not finished yet this function will wait until the RPC
240        gets finished.
241
242        Returns:
243            Finished status of the RPC as an AioRpcStatus object.
244        """
245        if self._status is not None:
246            return self._status
247
248        future = self._loop.create_future()
249        self._waiters_status.append(future)
250        await future
251
252        return self._status
253
254    def is_ok(self):
255        """Returns if the RPC is ended with ok."""
256        return self.done() and self._status.code() == StatusCode.ok
257
258    async def initial_metadata(self):
259        """Returns the initial metadata of the RPC call.
260
261        If the initial metadata has not been received yet this function will
262        wait until the RPC gets finished.
263
264        Returns:
265            The tuple object with the initial metadata.
266        """
267        if self._initial_metadata is not None:
268            return self._initial_metadata
269
270        future = self._loop.create_future()
271        self._waiters_initial_metadata.append(future)
272        await future
273
274        return self._initial_metadata
275
276    def is_locally_cancelled(self):
277        """Returns if the RPC was cancelled locally.
278
279        Returns:
280            True when was cancelled locally, False when was cancelled remotelly or
281            is still ongoing.
282        """
283        if self._is_locally_cancelled:
284            return True
285
286        return False
287
288    def set_internal_error(self, str error_str):
289        self._set_status(AioRpcStatus(
290            StatusCode.internal,
291            'Internal error from Core',
292            (),
293            error_str,
294        ))
295
296    async def unary_unary(self,
297                          bytes request,
298                          tuple outbound_initial_metadata,
299                          object context = None):
300        """Performs a unary unary RPC.
301
302        Args:
303          request: the serialized requests in bytes.
304          outbound_initial_metadata: optional outbound metadata.
305          context: instrumentation context.
306        """
307        cdef tuple ops
308
309        cdef SendInitialMetadataOperation initial_metadata_op = SendInitialMetadataOperation(
310            outbound_initial_metadata,
311            self._send_initial_metadata_flags)
312        cdef SendMessageOperation send_message_op = SendMessageOperation(request, _EMPTY_FLAGS)
313        cdef SendCloseFromClientOperation send_close_op = SendCloseFromClientOperation(_EMPTY_FLAGS)
314        cdef ReceiveInitialMetadataOperation receive_initial_metadata_op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS)
315        cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
316        cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
317
318        if context is not None:
319            set_instrumentation_context_on_call_aio(self, context)
320        ops = (initial_metadata_op, send_message_op, send_close_op,
321               receive_initial_metadata_op, receive_message_op,
322               receive_status_on_client_op)
323
324        # Executes all operations in one batch.
325        # Might raise CancelledError, handling it in Python UnaryUnaryCall.
326        await execute_batch(self,
327                            ops,
328                            self._loop)
329
330        self._set_initial_metadata(receive_initial_metadata_op.initial_metadata())
331
332        cdef grpc_status_code code
333        code = receive_status_on_client_op.code()
334
335        self._set_status(AioRpcStatus(
336            code,
337            receive_status_on_client_op.details(),
338            receive_status_on_client_op.trailing_metadata(),
339            receive_status_on_client_op.error_string(),
340        ))
341
342        if code == StatusCode.ok:
343            return receive_message_op.message()
344        else:
345            return None
346
347    async def _handle_status_once_received(self):
348        """Handles the status sent by peer once received."""
349        cdef ReceiveStatusOnClientOperation op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
350        cdef tuple ops = (op,)
351        await execute_batch(self, ops, self._loop)
352
353        # Halts if the RPC is locally cancelled
354        if self._is_locally_cancelled:
355            return
356
357        self._set_status(AioRpcStatus(
358            op.code(),
359            op.details(),
360            op.trailing_metadata(),
361            op.error_string(),
362        ))
363
364    async def receive_serialized_message(self):
365        """Receives one single raw message in bytes."""
366        cdef bytes received_message
367
368        # Receives a message. Returns None when failed:
369        # * EOF, no more messages to read;
370        # * The client application cancels;
371        # * The server sends final status.
372        received_message = await _receive_message(
373            self,
374            self._loop
375        )
376        if received_message is not None:
377            return received_message
378        else:
379            return EOF
380
381    async def send_serialized_message(self, bytes message):
382        """Sends one single raw message in bytes."""
383        await _send_message(self,
384                            message,
385                            None,
386                            False,
387                            self._loop)
388
389    async def send_receive_close(self):
390        """Half close the RPC on the client-side."""
391        cdef SendCloseFromClientOperation op = SendCloseFromClientOperation(_EMPTY_FLAGS)
392        cdef tuple ops = (op,)
393        await execute_batch(self, ops, self._loop)
394
395    async def initiate_unary_stream(self,
396                           bytes request,
397                           tuple outbound_initial_metadata,
398                           object context = None):
399        """Implementation of the start of a unary-stream call."""
400        # Peer may prematurely end this RPC at any point. We need a corutine
401        # that watches if the server sends the final status.
402        status_task = self._loop.create_task(self._handle_status_once_received())
403
404        cdef tuple outbound_ops
405        cdef Operation initial_metadata_op = SendInitialMetadataOperation(
406            outbound_initial_metadata,
407            self._send_initial_metadata_flags)
408        cdef Operation send_message_op = SendMessageOperation(
409            request,
410            _EMPTY_FLAGS)
411        cdef Operation send_close_op = SendCloseFromClientOperation(
412            _EMPTY_FLAGS)
413
414        if context is not None:
415            set_instrumentation_context_on_call_aio(self, context)
416        outbound_ops = (
417            initial_metadata_op,
418            send_message_op,
419            send_close_op,
420        )
421
422        try:
423            # Sends out the request message.
424            await execute_batch(self,
425                                outbound_ops,
426                                self._loop)
427
428            # Receives initial metadata.
429            self._set_initial_metadata(
430                await _receive_initial_metadata(self,
431                                                self._loop),
432            )
433        except ExecuteBatchError as batch_error:
434            # Core should explain why this batch failed
435            await status_task
436
437    async def stream_unary(self,
438                           tuple outbound_initial_metadata,
439                           object metadata_sent_observer,
440                           object context = None):
441        """Actual implementation of the complete unary-stream call.
442
443        Needs to pay extra attention to the raise mechanism. If we want to
444        propagate the final status exception, then we have to raise it.
445        Othersize, it would end normally and raise `StopAsyncIteration()`.
446        """
447        try:
448            # Sends out initial_metadata ASAP.
449            await _send_initial_metadata(self,
450                                        outbound_initial_metadata,
451                                        self._send_initial_metadata_flags,
452                                        self._loop)
453            # Notify upper level that sending messages are allowed now.
454            metadata_sent_observer()
455
456            # Receives initial metadata.
457            self._set_initial_metadata(
458                await _receive_initial_metadata(self, self._loop)
459            )
460        except ExecuteBatchError:
461            # Core should explain why this batch failed
462            await self._handle_status_once_received()
463
464            # Allow upper layer to proceed only if the status is set
465            metadata_sent_observer()
466            return None
467
468        cdef tuple inbound_ops
469        cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
470        cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
471
472        if context is not None:
473            set_instrumentation_context_on_call_aio(self, context)
474        inbound_ops = (receive_message_op, receive_status_on_client_op)
475
476        # Executes all operations in one batch.
477        await execute_batch(self,
478                            inbound_ops,
479                            self._loop)
480
481        cdef grpc_status_code code
482        code = receive_status_on_client_op.code()
483
484        self._set_status(AioRpcStatus(
485            code,
486            receive_status_on_client_op.details(),
487            receive_status_on_client_op.trailing_metadata(),
488            receive_status_on_client_op.error_string(),
489        ))
490
491        if code == StatusCode.ok:
492            return receive_message_op.message()
493        else:
494            return None
495
496    async def initiate_stream_stream(self,
497                           tuple outbound_initial_metadata,
498                           object metadata_sent_observer,
499                           object context = None):
500        """Actual implementation of the complete stream-stream call.
501
502        Needs to pay extra attention to the raise mechanism. If we want to
503        propagate the final status exception, then we have to raise it.
504        Othersize, it would end normally and raise `StopAsyncIteration()`.
505        """
506        # Peer may prematurely end this RPC at any point. We need a corutine
507        # that watches if the server sends the final status.
508        status_task = self._loop.create_task(self._handle_status_once_received())
509
510        if context is not None:
511            set_instrumentation_context_on_call_aio(self, context)
512
513        try:
514            # Sends out initial_metadata ASAP.
515            await _send_initial_metadata(self,
516                                        outbound_initial_metadata,
517                                        self._send_initial_metadata_flags,
518                                        self._loop)
519            # Notify upper level that sending messages are allowed now.
520            metadata_sent_observer()
521
522            # Receives initial metadata.
523            self._set_initial_metadata(
524                await _receive_initial_metadata(self, self._loop)
525            )
526        except ExecuteBatchError as batch_error:
527            # Core should explain why this batch failed
528            await status_task
529
530            # Allow upper layer to proceed only if the status is set
531            metadata_sent_observer()