1# Copyright 2019 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16_EMPTY_FLAGS = 0 17_EMPTY_MASK = 0 18_IMMUTABLE_EMPTY_METADATA = tuple() 19 20_UNKNOWN_CANCELLATION_DETAILS = 'RPC cancelled for unknown reason.' 21_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n' 22 '\tstatus = {}\n' 23 '\tdetails = "{}"\n' 24 '>') 25 26_NON_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n' 27 '\tstatus = {}\n' 28 '\tdetails = "{}"\n' 29 '\tdebug_error_string = "{}"\n' 30 '>') 31 32 33cdef int _get_send_initial_metadata_flags(object wait_for_ready) except *: 34 cdef int flags = 0 35 # Wait-for-ready can be None, which means using default value in Core. 36 if wait_for_ready is not None: 37 flags |= InitialMetadataFlags.wait_for_ready_explicitly_set 38 if wait_for_ready: 39 flags |= InitialMetadataFlags.wait_for_ready 40 41 flags &= InitialMetadataFlags.used_mask 42 return flags 43 44 45cdef class _AioCall(GrpcCallWrapper): 46 47 def __cinit__(self, AioChannel channel, object deadline, 48 bytes method, CallCredentials call_credentials, object wait_for_ready): 49 init_grpc_aio() 50 self.call = NULL 51 self._channel = channel 52 self._loop = channel.loop 53 self._references = [] 54 self._status = None 55 self._initial_metadata = None 56 self._waiters_status = [] 57 self._waiters_initial_metadata = [] 58 self._done_callbacks = [] 59 self._is_locally_cancelled = False 60 self._deadline = deadline 61 self._send_initial_metadata_flags = _get_send_initial_metadata_flags(wait_for_ready) 62 self._create_grpc_call(deadline, method, call_credentials) 63 64 def __dealloc__(self): 65 if self.call: 66 grpc_call_unref(self.call) 67 shutdown_grpc_aio() 68 69 def _repr(self) -> str: 70 """Assembles the RPC representation string.""" 71 # This needs to be loaded at run time once everything 72 # has been loaded. 73 from grpc import _common 74 75 if not self.done(): 76 return '<{} object>'.format(self.__class__.__name__) 77 78 if self._status.code() is StatusCode.ok: 79 return _OK_CALL_REPRESENTATION.format( 80 self.__class__.__name__, 81 _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[self._status.code()], 82 self._status.details()) 83 else: 84 return _NON_OK_CALL_REPRESENTATION.format( 85 self.__class__.__name__, 86 self._status.details(), 87 _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[self._status.code()], 88 self._status.debug_error_string()) 89 90 def __repr__(self) -> str: 91 return self._repr() 92 93 def __str__(self) -> str: 94 return self._repr() 95 96 cdef void _create_grpc_call(self, 97 object deadline, 98 bytes method, 99 CallCredentials credentials) except *: 100 """Creates the corresponding Core object for this RPC. 101 102 For unary calls, the grpc_call lives shortly and can be destroyed after 103 invoke start_batch. However, if either side is streaming, the grpc_call 104 life span will be longer than one function. So, it would better save it 105 as an instance variable than a stack variable, which reflects its 106 nature in Core. 107 """ 108 cdef grpc_slice method_slice 109 cdef gpr_timespec c_deadline = _timespec_from_time(deadline) 110 cdef grpc_call_error set_credentials_error 111 112 method_slice = grpc_slice_from_copied_buffer( 113 <const char *> method, 114 <size_t> len(method) 115 ) 116 self.call = grpc_channel_create_call( 117 self._channel.channel, 118 NULL, 119 _EMPTY_MASK, 120 global_completion_queue(), 121 method_slice, 122 NULL, 123 c_deadline, 124 NULL 125 ) 126 127 if credentials is not None: 128 set_credentials_error = grpc_call_set_credentials(self.call, credentials.c()) 129 if set_credentials_error != GRPC_CALL_OK: 130 raise InternalError("Credentials couldn't have been set: {0}".format(set_credentials_error)) 131 132 grpc_slice_unref(method_slice) 133 134 cdef void _set_status(self, AioRpcStatus status) except *: 135 cdef list waiters 136 137 # No more waiters should be expected since status has been set. 138 self._status = status 139 140 if self._initial_metadata is None: 141 self._set_initial_metadata(_IMMUTABLE_EMPTY_METADATA) 142 143 for waiter in self._waiters_status: 144 if not waiter.done(): 145 waiter.set_result(None) 146 self._waiters_status = [] 147 148 for callback in self._done_callbacks: 149 callback() 150 151 cdef void _set_initial_metadata(self, tuple initial_metadata) except *: 152 if self._initial_metadata is not None: 153 # Some gRPC calls might end before the initial metadata arrived in 154 # the Call object. That causes this method to be invoked twice: 1. 155 # filled with an empty metadata; 2. updated with the actual user 156 # provided metadata. 157 return 158 159 cdef list waiters 160 161 # No more waiters should be expected since initial metadata has been 162 # set. 163 self._initial_metadata = initial_metadata 164 165 for waiter in self._waiters_initial_metadata: 166 if not waiter.done(): 167 waiter.set_result(None) 168 self._waiters_initial_metadata = [] 169 170 def add_done_callback(self, callback): 171 if self.done(): 172 callback() 173 else: 174 self._done_callbacks.append(callback) 175 176 def time_remaining(self): 177 if self._deadline is None: 178 return None 179 else: 180 return max(0, self._deadline - time.time()) 181 182 def cancel(self, str details): 183 """Cancels the RPC in Core with given RPC status. 184 185 Above abstractions must invoke this method to set Core objects into 186 proper state. 187 """ 188 self._is_locally_cancelled = True 189 190 cdef object details_bytes 191 cdef char *c_details 192 cdef grpc_call_error error 193 194 self._set_status(AioRpcStatus( 195 StatusCode.cancelled, 196 details, 197 None, 198 None, 199 )) 200 201 details_bytes = str_to_bytes(details) 202 self._references.append(details_bytes) 203 c_details = <char *>details_bytes 204 # By implementation, grpc_call_cancel_with_status always return OK 205 error = grpc_call_cancel_with_status( 206 self.call, 207 StatusCode.cancelled, 208 c_details, 209 NULL, 210 ) 211 assert error == GRPC_CALL_OK 212 213 def done(self): 214 """Returns if the RPC call has finished. 215 216 Checks if the status has been provided, either 217 because the RPC finished or because was cancelled.. 218 219 Returns: 220 True if the RPC can be considered finished. 221 """ 222 return self._status is not None 223 224 def cancelled(self): 225 """Returns if the RPC was cancelled. 226 227 Returns: 228 True if the RPC was cancelled. 229 """ 230 if not self.done(): 231 return False 232 233 return self._status.code() == StatusCode.cancelled 234 235 async def status(self): 236 """Returns the status of the RPC call. 237 238 It returns the finshed status of the RPC. If the RPC 239 has not finished yet this function will wait until the RPC 240 gets finished. 241 242 Returns: 243 Finished status of the RPC as an AioRpcStatus object. 244 """ 245 if self._status is not None: 246 return self._status 247 248 future = self._loop.create_future() 249 self._waiters_status.append(future) 250 await future 251 252 return self._status 253 254 def is_ok(self): 255 """Returns if the RPC is ended with ok.""" 256 return self.done() and self._status.code() == StatusCode.ok 257 258 async def initial_metadata(self): 259 """Returns the initial metadata of the RPC call. 260 261 If the initial metadata has not been received yet this function will 262 wait until the RPC gets finished. 263 264 Returns: 265 The tuple object with the initial metadata. 266 """ 267 if self._initial_metadata is not None: 268 return self._initial_metadata 269 270 future = self._loop.create_future() 271 self._waiters_initial_metadata.append(future) 272 await future 273 274 return self._initial_metadata 275 276 def is_locally_cancelled(self): 277 """Returns if the RPC was cancelled locally. 278 279 Returns: 280 True when was cancelled locally, False when was cancelled remotelly or 281 is still ongoing. 282 """ 283 if self._is_locally_cancelled: 284 return True 285 286 return False 287 288 def set_internal_error(self, str error_str): 289 self._set_status(AioRpcStatus( 290 StatusCode.internal, 291 'Internal error from Core', 292 (), 293 error_str, 294 )) 295 296 async def unary_unary(self, 297 bytes request, 298 tuple outbound_initial_metadata, 299 object context = None): 300 """Performs a unary unary RPC. 301 302 Args: 303 request: the serialized requests in bytes. 304 outbound_initial_metadata: optional outbound metadata. 305 context: instrumentation context. 306 """ 307 cdef tuple ops 308 309 cdef SendInitialMetadataOperation initial_metadata_op = SendInitialMetadataOperation( 310 outbound_initial_metadata, 311 self._send_initial_metadata_flags) 312 cdef SendMessageOperation send_message_op = SendMessageOperation(request, _EMPTY_FLAGS) 313 cdef SendCloseFromClientOperation send_close_op = SendCloseFromClientOperation(_EMPTY_FLAGS) 314 cdef ReceiveInitialMetadataOperation receive_initial_metadata_op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS) 315 cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS) 316 cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS) 317 318 if context is not None: 319 set_instrumentation_context_on_call_aio(self, context) 320 ops = (initial_metadata_op, send_message_op, send_close_op, 321 receive_initial_metadata_op, receive_message_op, 322 receive_status_on_client_op) 323 324 # Executes all operations in one batch. 325 # Might raise CancelledError, handling it in Python UnaryUnaryCall. 326 await execute_batch(self, 327 ops, 328 self._loop) 329 330 self._set_initial_metadata(receive_initial_metadata_op.initial_metadata()) 331 332 cdef grpc_status_code code 333 code = receive_status_on_client_op.code() 334 335 self._set_status(AioRpcStatus( 336 code, 337 receive_status_on_client_op.details(), 338 receive_status_on_client_op.trailing_metadata(), 339 receive_status_on_client_op.error_string(), 340 )) 341 342 if code == StatusCode.ok: 343 return receive_message_op.message() 344 else: 345 return None 346 347 async def _handle_status_once_received(self): 348 """Handles the status sent by peer once received.""" 349 cdef ReceiveStatusOnClientOperation op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS) 350 cdef tuple ops = (op,) 351 await execute_batch(self, ops, self._loop) 352 353 # Halts if the RPC is locally cancelled 354 if self._is_locally_cancelled: 355 return 356 357 self._set_status(AioRpcStatus( 358 op.code(), 359 op.details(), 360 op.trailing_metadata(), 361 op.error_string(), 362 )) 363 364 async def receive_serialized_message(self): 365 """Receives one single raw message in bytes.""" 366 cdef bytes received_message 367 368 # Receives a message. Returns None when failed: 369 # * EOF, no more messages to read; 370 # * The client application cancels; 371 # * The server sends final status. 372 received_message = await _receive_message( 373 self, 374 self._loop 375 ) 376 if received_message is not None: 377 return received_message 378 else: 379 return EOF 380 381 async def send_serialized_message(self, bytes message): 382 """Sends one single raw message in bytes.""" 383 await _send_message(self, 384 message, 385 None, 386 False, 387 self._loop) 388 389 async def send_receive_close(self): 390 """Half close the RPC on the client-side.""" 391 cdef SendCloseFromClientOperation op = SendCloseFromClientOperation(_EMPTY_FLAGS) 392 cdef tuple ops = (op,) 393 await execute_batch(self, ops, self._loop) 394 395 async def initiate_unary_stream(self, 396 bytes request, 397 tuple outbound_initial_metadata, 398 object context = None): 399 """Implementation of the start of a unary-stream call.""" 400 # Peer may prematurely end this RPC at any point. We need a corutine 401 # that watches if the server sends the final status. 402 status_task = self._loop.create_task(self._handle_status_once_received()) 403 404 cdef tuple outbound_ops 405 cdef Operation initial_metadata_op = SendInitialMetadataOperation( 406 outbound_initial_metadata, 407 self._send_initial_metadata_flags) 408 cdef Operation send_message_op = SendMessageOperation( 409 request, 410 _EMPTY_FLAGS) 411 cdef Operation send_close_op = SendCloseFromClientOperation( 412 _EMPTY_FLAGS) 413 414 if context is not None: 415 set_instrumentation_context_on_call_aio(self, context) 416 outbound_ops = ( 417 initial_metadata_op, 418 send_message_op, 419 send_close_op, 420 ) 421 422 try: 423 # Sends out the request message. 424 await execute_batch(self, 425 outbound_ops, 426 self._loop) 427 428 # Receives initial metadata. 429 self._set_initial_metadata( 430 await _receive_initial_metadata(self, 431 self._loop), 432 ) 433 except ExecuteBatchError as batch_error: 434 # Core should explain why this batch failed 435 await status_task 436 437 async def stream_unary(self, 438 tuple outbound_initial_metadata, 439 object metadata_sent_observer, 440 object context = None): 441 """Actual implementation of the complete unary-stream call. 442 443 Needs to pay extra attention to the raise mechanism. If we want to 444 propagate the final status exception, then we have to raise it. 445 Othersize, it would end normally and raise `StopAsyncIteration()`. 446 """ 447 try: 448 # Sends out initial_metadata ASAP. 449 await _send_initial_metadata(self, 450 outbound_initial_metadata, 451 self._send_initial_metadata_flags, 452 self._loop) 453 # Notify upper level that sending messages are allowed now. 454 metadata_sent_observer() 455 456 # Receives initial metadata. 457 self._set_initial_metadata( 458 await _receive_initial_metadata(self, self._loop) 459 ) 460 except ExecuteBatchError: 461 # Core should explain why this batch failed 462 await self._handle_status_once_received() 463 464 # Allow upper layer to proceed only if the status is set 465 metadata_sent_observer() 466 return None 467 468 cdef tuple inbound_ops 469 cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS) 470 cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS) 471 472 if context is not None: 473 set_instrumentation_context_on_call_aio(self, context) 474 inbound_ops = (receive_message_op, receive_status_on_client_op) 475 476 # Executes all operations in one batch. 477 await execute_batch(self, 478 inbound_ops, 479 self._loop) 480 481 cdef grpc_status_code code 482 code = receive_status_on_client_op.code() 483 484 self._set_status(AioRpcStatus( 485 code, 486 receive_status_on_client_op.details(), 487 receive_status_on_client_op.trailing_metadata(), 488 receive_status_on_client_op.error_string(), 489 )) 490 491 if code == StatusCode.ok: 492 return receive_message_op.message() 493 else: 494 return None 495 496 async def initiate_stream_stream(self, 497 tuple outbound_initial_metadata, 498 object metadata_sent_observer, 499 object context = None): 500 """Actual implementation of the complete stream-stream call. 501 502 Needs to pay extra attention to the raise mechanism. If we want to 503 propagate the final status exception, then we have to raise it. 504 Othersize, it would end normally and raise `StopAsyncIteration()`. 505 """ 506 # Peer may prematurely end this RPC at any point. We need a corutine 507 # that watches if the server sends the final status. 508 status_task = self._loop.create_task(self._handle_status_once_received()) 509 510 if context is not None: 511 set_instrumentation_context_on_call_aio(self, context) 512 513 try: 514 # Sends out initial_metadata ASAP. 515 await _send_initial_metadata(self, 516 outbound_initial_metadata, 517 self._send_initial_metadata_flags, 518 self._loop) 519 # Notify upper level that sending messages are allowed now. 520 metadata_sent_observer() 521 522 # Receives initial metadata. 523 self._set_initial_metadata( 524 await _receive_initial_metadata(self, self._loop) 525 ) 526 except ExecuteBatchError as batch_error: 527 # Core should explain why this batch failed 528 await status_task 529 530 # Allow upper layer to proceed only if the status is set 531 metadata_sent_observer()