1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from grpc import _observability 16 17_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = ( 18 'Internal gRPC call error %d. ' + 19 'Please report to https://github.com/grpc/grpc/issues') 20 21 22cdef str _call_error_metadata(metadata): 23 return 'metadata was invalid: %s' % metadata 24 25 26cdef str _call_error_no_metadata(c_call_error): 27 return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error 28 29 30cdef str _call_error(c_call_error, metadata): 31 if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA: 32 return _call_error_metadata(metadata) 33 else: 34 return _call_error_no_metadata(c_call_error) 35 36 37cdef _check_call_error_no_metadata(c_call_error): 38 if c_call_error != GRPC_CALL_OK: 39 return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error 40 else: 41 return None 42 43 44cdef _check_and_raise_call_error_no_metadata(c_call_error): 45 error = _check_call_error_no_metadata(c_call_error) 46 if error is not None: 47 raise ValueError(error) 48 49 50cdef _check_call_error(c_call_error, metadata): 51 if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA: 52 return _call_error_metadata(metadata) 53 else: 54 return _check_call_error_no_metadata(c_call_error) 55 56 57cdef void _raise_call_error_no_metadata(c_call_error) except *: 58 raise ValueError(_call_error_no_metadata(c_call_error)) 59 60 61cdef void _raise_call_error(c_call_error, metadata) except *: 62 raise ValueError(_call_error(c_call_error, metadata)) 63 64 65cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue): 66 grpc_completion_queue_shutdown(c_completion_queue) 67 grpc_completion_queue_destroy(c_completion_queue) 68 69 70cdef class _CallState: 71 72 def __cinit__(self): 73 self.due = set() 74 75 cdef void maybe_delete_call_tracer(self) except *: 76 if not self.call_tracer_capsule: 77 return 78 _observability.delete_call_tracer(self.call_tracer_capsule) 79 80 cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *: 81 # TODO(xuanwn): use channel args to exclude those metrics. 82 for exclude_prefix in _observability._SERVICES_TO_EXCLUDE: 83 if exclude_prefix in method_name: 84 return 85 with _observability.get_plugin() as plugin: 86 if not (plugin and plugin.observability_enabled): 87 return 88 capsule = plugin.create_client_call_tracer(method_name, target) 89 capsule_ptr = cpython.PyCapsule_GetPointer(capsule, CLIENT_CALL_TRACER) 90 _set_call_tracer(self.c_call, capsule_ptr) 91 self.call_tracer_capsule = capsule 92 93cdef class _ChannelState: 94 95 def __cinit__(self, target): 96 self.target = target 97 self.condition = threading.Condition() 98 self.open = True 99 self.integrated_call_states = {} 100 self.segregated_call_states = set() 101 self.connectivity_due = set() 102 self.closed_reason = None 103 104cdef class CallHandle: 105 106 def __cinit__(self, _ChannelState channel_state, object method): 107 self.method = method 108 cpython.Py_INCREF(method) 109 # Note that since we always pass None for host, we set the 110 # second-to-last parameter of grpc_channel_register_call to a fixed 111 # NULL value. 112 self.c_call_handle = grpc_channel_register_call( 113 channel_state.c_channel, <const char *>method, NULL, NULL) 114 115 def __dealloc__(self): 116 cpython.Py_DECREF(self.method) 117 118 @property 119 def call_handle(self): 120 return cpython.PyLong_FromVoidPtr(self.c_call_handle) 121 122 123 124cdef tuple _operate(grpc_call *c_call, object operations, object user_tag): 125 cdef grpc_call_error c_call_error 126 cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None) 127 tag.prepare() 128 cpython.Py_INCREF(tag) 129 with nogil: 130 c_call_error = grpc_call_start_batch( 131 c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL) 132 return c_call_error, tag 133 134 135cdef object _operate_from_integrated_call( 136 _ChannelState channel_state, _CallState call_state, object operations, 137 object user_tag): 138 cdef grpc_call_error c_call_error 139 cdef _BatchOperationTag tag 140 with channel_state.condition: 141 if call_state.due: 142 c_call_error, tag = _operate(call_state.c_call, operations, user_tag) 143 if c_call_error == GRPC_CALL_OK: 144 call_state.due.add(tag) 145 channel_state.integrated_call_states[tag] = call_state 146 return True 147 else: 148 _raise_call_error_no_metadata(c_call_error) 149 else: 150 return False 151 152 153cdef object _operate_from_segregated_call( 154 _ChannelState channel_state, _CallState call_state, object operations, 155 object user_tag): 156 cdef grpc_call_error c_call_error 157 cdef _BatchOperationTag tag 158 with channel_state.condition: 159 if call_state.due: 160 c_call_error, tag = _operate(call_state.c_call, operations, user_tag) 161 if c_call_error == GRPC_CALL_OK: 162 call_state.due.add(tag) 163 return True 164 else: 165 _raise_call_error_no_metadata(c_call_error) 166 else: 167 return False 168 169 170cdef _cancel( 171 _ChannelState channel_state, _CallState call_state, grpc_status_code code, 172 str details): 173 cdef grpc_call_error c_call_error 174 with channel_state.condition: 175 if call_state.due: 176 c_call_error = grpc_call_cancel_with_status( 177 call_state.c_call, code, _encode(details), NULL) 178 _check_and_raise_call_error_no_metadata(c_call_error) 179 180 181cdef _next_call_event( 182 _ChannelState channel_state, grpc_completion_queue *c_completion_queue, 183 on_success, on_failure, deadline): 184 """Block on the next event out of the completion queue. 185 186 On success, `on_success` will be invoked with the tag taken from the CQ. 187 In the case of a failure due to an exception raised in a signal handler, 188 `on_failure` will be invoked with no arguments. Note that this situation 189 can only occur on the main thread. 190 191 Args: 192 channel_state: The state for the channel on which the RPC is running. 193 c_completion_queue: The CQ which will be polled. 194 on_success: A callable object to be invoked upon successful receipt of a 195 tag from the CQ. 196 on_failure: A callable object to be invoked in case a Python exception is 197 raised from a signal handler during polling. 198 deadline: The point after which the RPC will time out. 199 """ 200 try: 201 tag, event = _latent_event(c_completion_queue, deadline) 202 # NOTE(rbellevi): This broad except enables us to clean up resources before 203 # propagating any exceptions raised by signal handlers to the application. 204 except: 205 if on_failure is not None: 206 on_failure() 207 raise 208 else: 209 with channel_state.condition: 210 on_success(tag) 211 channel_state.condition.notify_all() 212 return event 213 214 215# TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler. 216cdef void _call( 217 _ChannelState channel_state, _CallState call_state, 218 grpc_completion_queue *c_completion_queue, on_success, int flags, method, 219 host, object deadline, CallCredentials credentials, 220 object operationses_and_user_tags, object metadata, 221 object context, object registered_call_handle) except *: 222 """Invokes an RPC. 223 224 Args: 225 channel_state: A _ChannelState with its "open" attribute set to True. RPCs 226 may not be invoked on a closed channel. 227 call_state: An empty _CallState to be altered (specifically assigned a 228 c_call and having its due set populated) if the RPC invocation is 229 successful. 230 c_completion_queue: A grpc_completion_queue to be used for the call's 231 operations. 232 on_success: A behavior to be called if attempting to start operations for 233 the call succeeds. If called the behavior will be called while holding the 234 channel_state condition and passed the tags associated with operations 235 that were successfully started for the call. 236 flags: Flags to be passed to gRPC Core as part of call creation. 237 method: The fully-qualified name of the RPC method being invoked. 238 host: A "host" string to be passed to gRPC Core as part of call creation. 239 deadline: A float for the deadline of the RPC, or None if the RPC is to have 240 no deadline. 241 credentials: A _CallCredentials for the RPC or None. 242 operationses_and_user_tags: A sequence of length-two sequences the first 243 element of which is a sequence of Operations and the second element of 244 which is an object to be used as a tag. A SendInitialMetadataOperation 245 must be present in the first element of this value. 246 metadata: The metadata for this call. 247 context: Context object for distributed tracing. 248 registered_call_handle: An int representing the call handle of the method, or 249 None if the method is not registered. 250 """ 251 cdef grpc_slice method_slice 252 cdef grpc_slice host_slice 253 cdef grpc_slice *host_slice_ptr 254 cdef grpc_call_credentials *c_call_credentials 255 cdef grpc_call_error c_call_error 256 cdef tuple error_and_wrapper_tag 257 cdef _BatchOperationTag wrapper_tag 258 with channel_state.condition: 259 if channel_state.open: 260 method_slice = _slice_from_bytes(method) 261 if host is None: 262 host_slice_ptr = NULL 263 else: 264 host_slice = _slice_from_bytes(host) 265 host_slice_ptr = &host_slice 266 if registered_call_handle: 267 call_state.c_call = grpc_channel_create_registered_call( 268 channel_state.c_channel, NULL, flags, 269 c_completion_queue, cpython.PyLong_AsVoidPtr(registered_call_handle), 270 _timespec_from_time(deadline), NULL) 271 else: 272 call_state.c_call = grpc_channel_create_call( 273 channel_state.c_channel, NULL, flags, 274 c_completion_queue, method_slice, host_slice_ptr, 275 _timespec_from_time(deadline), NULL) 276 grpc_slice_unref(method_slice) 277 if host_slice_ptr: 278 grpc_slice_unref(host_slice) 279 call_state.maybe_set_client_call_tracer_on_call(method, channel_state.target) 280 if context is not None: 281 set_census_context_on_call(call_state, context) 282 if credentials is not None: 283 c_call_credentials = credentials.c() 284 c_call_error = grpc_call_set_credentials( 285 call_state.c_call, c_call_credentials) 286 grpc_call_credentials_release(c_call_credentials) 287 if c_call_error != GRPC_CALL_OK: 288 #TODO(xuanwn): Expand the scope of nogil 289 with nogil: 290 grpc_call_unref(call_state.c_call) 291 call_state.c_call = NULL 292 _raise_call_error_no_metadata(c_call_error) 293 started_tags = set() 294 for operations, user_tag in operationses_and_user_tags: 295 c_call_error, tag = _operate(call_state.c_call, operations, user_tag) 296 if c_call_error == GRPC_CALL_OK: 297 started_tags.add(tag) 298 else: 299 grpc_call_cancel(call_state.c_call, NULL) 300 #TODO(xuanwn): Expand the scope of nogil 301 with nogil: 302 grpc_call_unref(call_state.c_call) 303 call_state.c_call = NULL 304 _raise_call_error(c_call_error, metadata) 305 else: 306 call_state.due.update(started_tags) 307 on_success(started_tags) 308 else: 309 raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason) 310 311 312cdef void _process_integrated_call_tag( 313 _ChannelState state, _BatchOperationTag tag) except *: 314 cdef _CallState call_state = state.integrated_call_states.pop(tag) 315 call_state.due.remove(tag) 316 if not call_state.due: 317 with nogil: 318 grpc_call_unref(call_state.c_call) 319 call_state.c_call = NULL 320 call_state.maybe_delete_call_tracer() 321 322cdef class IntegratedCall: 323 324 def __cinit__(self, _ChannelState channel_state, _CallState call_state): 325 self._channel_state = channel_state 326 self._call_state = call_state 327 328 def operate(self, operations, tag): 329 return _operate_from_integrated_call( 330 self._channel_state, self._call_state, operations, tag) 331 332 def cancel(self, code, details): 333 _cancel(self._channel_state, self._call_state, code, details) 334 335 336cdef IntegratedCall _integrated_call( 337 _ChannelState state, int flags, method, host, object deadline, 338 object metadata, CallCredentials credentials, operationses_and_user_tags, 339 object context, object registered_call_handle): 340 call_state = _CallState() 341 342 def on_success(started_tags): 343 for started_tag in started_tags: 344 state.integrated_call_states[started_tag] = call_state 345 346 _call( 347 state, call_state, state.c_call_completion_queue, on_success, flags, 348 method, host, deadline, credentials, operationses_and_user_tags, 349 metadata, context, registered_call_handle) 350 351 return IntegratedCall(state, call_state) 352 353 354cdef object _process_segregated_call_tag( 355 _ChannelState state, _CallState call_state, 356 grpc_completion_queue *c_completion_queue, _BatchOperationTag tag): 357 call_state.due.remove(tag) 358 if not call_state.due: 359 #TODO(xuanwn): Expand the scope of nogil 360 with nogil: 361 grpc_call_unref(call_state.c_call) 362 call_state.c_call = NULL 363 call_state.maybe_delete_call_tracer() 364 state.segregated_call_states.remove(call_state) 365 _destroy_c_completion_queue(c_completion_queue) 366 return True 367 else: 368 return False 369 370 371cdef class SegregatedCall: 372 373 def __cinit__(self, _ChannelState channel_state, _CallState call_state): 374 self._channel_state = channel_state 375 self._call_state = call_state 376 377 def operate(self, operations, tag): 378 return _operate_from_segregated_call( 379 self._channel_state, self._call_state, operations, tag) 380 381 def cancel(self, code, details): 382 _cancel(self._channel_state, self._call_state, code, details) 383 384 def next_event(self): 385 def on_success(tag): 386 _process_segregated_call_tag( 387 self._channel_state, self._call_state, self._c_completion_queue, tag) 388 def on_failure(): 389 self._call_state.due.clear() 390 with nogil: 391 grpc_call_unref(self._call_state.c_call) 392 self._call_state.c_call = NULL 393 self._channel_state.segregated_call_states.remove(self._call_state) 394 _destroy_c_completion_queue(self._c_completion_queue) 395 return _next_call_event( 396 self._channel_state, self._c_completion_queue, on_success, on_failure, None) 397 398 399cdef SegregatedCall _segregated_call( 400 _ChannelState state, int flags, method, host, object deadline, 401 object metadata, CallCredentials credentials, operationses_and_user_tags, 402 object context, object registered_call_handle): 403 cdef _CallState call_state = _CallState() 404 cdef SegregatedCall segregated_call 405 cdef grpc_completion_queue *c_completion_queue 406 407 def on_success(started_tags): 408 state.segregated_call_states.add(call_state) 409 410 with state.condition: 411 if state.open: 412 c_completion_queue = (grpc_completion_queue_create_for_next(NULL)) 413 else: 414 raise ValueError('Cannot invoke RPC on closed channel!') 415 416 try: 417 _call( 418 state, call_state, c_completion_queue, on_success, flags, method, host, 419 deadline, credentials, operationses_and_user_tags, metadata, 420 context, registered_call_handle) 421 except: 422 _destroy_c_completion_queue(c_completion_queue) 423 raise 424 425 segregated_call = SegregatedCall(state, call_state) 426 segregated_call._c_completion_queue = c_completion_queue 427 return segregated_call 428 429 430cdef object _watch_connectivity_state( 431 _ChannelState state, grpc_connectivity_state last_observed_state, 432 object deadline): 433 cdef _ConnectivityTag tag = _ConnectivityTag(object()) 434 with state.condition: 435 if state.open: 436 cpython.Py_INCREF(tag) 437 grpc_channel_watch_connectivity_state( 438 state.c_channel, last_observed_state, _timespec_from_time(deadline), 439 state.c_connectivity_completion_queue, <cpython.PyObject *>tag) 440 state.connectivity_due.add(tag) 441 else: 442 raise ValueError('Cannot monitor channel state: %s' % state.closed_reason) 443 completed_tag, event = _latent_event( 444 state.c_connectivity_completion_queue, None) 445 with state.condition: 446 state.connectivity_due.remove(completed_tag) 447 state.condition.notify_all() 448 return event 449 450 451cdef _close(Channel channel, grpc_status_code code, object details, 452 drain_calls): 453 cdef _ChannelState state = channel._state 454 cdef _CallState call_state 455 encoded_details = _encode(details) 456 with state.condition: 457 if state.open: 458 state.open = False 459 state.closed_reason = details 460 for call_state in set(state.integrated_call_states.values()): 461 grpc_call_cancel_with_status( 462 call_state.c_call, code, encoded_details, NULL) 463 for call_state in state.segregated_call_states: 464 grpc_call_cancel_with_status( 465 call_state.c_call, code, encoded_details, NULL) 466 # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity 467 # watching. 468 469 if drain_calls: 470 while not _calls_drained(state): 471 event = channel.next_call_event() 472 if event.completion_type == CompletionType.queue_timeout: 473 continue 474 event.tag(event) 475 else: 476 while state.integrated_call_states: 477 state.condition.wait() 478 while state.connectivity_due: 479 state.condition.wait() 480 481 _destroy_c_completion_queue(state.c_call_completion_queue) 482 _destroy_c_completion_queue(state.c_connectivity_completion_queue) 483 grpc_channel_destroy(state.c_channel) 484 state.c_channel = NULL 485 grpc_shutdown() 486 state.condition.notify_all() 487 else: 488 # Another call to close already completed in the past or is currently 489 # being executed in another thread. 490 while state.c_channel != NULL: 491 state.condition.wait() 492 493 494cdef _calls_drained(_ChannelState state): 495 return not (state.integrated_call_states or state.segregated_call_states or 496 state.connectivity_due) 497 498cdef class Channel: 499 500 def __cinit__( 501 self, bytes target, object arguments, 502 ChannelCredentials channel_credentials): 503 arguments = () if arguments is None else tuple(arguments) 504 fork_handlers_and_grpc_init() 505 self._state = _ChannelState(target) 506 self._state.c_call_completion_queue = ( 507 grpc_completion_queue_create_for_next(NULL)) 508 self._state.c_connectivity_completion_queue = ( 509 grpc_completion_queue_create_for_next(NULL)) 510 self._arguments = arguments 511 cdef _ChannelArgs channel_args = _ChannelArgs(arguments) 512 c_channel_credentials = ( 513 channel_credentials.c() if channel_credentials is not None 514 else grpc_insecure_credentials_create()) 515 self._state.c_channel = grpc_channel_create( 516 <char *>target, c_channel_credentials, channel_args.c_args()) 517 self._registered_call_handles = {} 518 grpc_channel_credentials_release(c_channel_credentials) 519 520 def target(self): 521 cdef char *c_target 522 with self._state.condition: 523 c_target = grpc_channel_get_target(self._state.c_channel) 524 target = <bytes>c_target 525 gpr_free(c_target) 526 return target 527 528 def integrated_call( 529 self, int flags, method, host, object deadline, object metadata, 530 CallCredentials credentials, operationses_and_tags, 531 object context = None, object registered_call_handle = None): 532 return _integrated_call( 533 self._state, flags, method, host, deadline, metadata, credentials, 534 operationses_and_tags, context, registered_call_handle) 535 536 def next_call_event(self): 537 def on_success(tag): 538 if tag is not None: 539 _process_integrated_call_tag(self._state, tag) 540 if is_fork_support_enabled(): 541 queue_deadline = time.time() + 1.0 542 else: 543 queue_deadline = None 544 # NOTE(gnossen): It is acceptable for on_failure to be None here because 545 # failure conditions can only ever happen on the main thread and this 546 # method is only ever invoked on the channel spin thread. 547 return _next_call_event(self._state, self._state.c_call_completion_queue, 548 on_success, None, queue_deadline) 549 550 def segregated_call( 551 self, int flags, method, host, object deadline, object metadata, 552 CallCredentials credentials, operationses_and_tags, 553 object context = None, object registered_call_handle = None): 554 return _segregated_call( 555 self._state, flags, method, host, deadline, metadata, credentials, 556 operationses_and_tags, context, registered_call_handle) 557 558 def check_connectivity_state(self, bint try_to_connect): 559 with self._state.condition: 560 if self._state.open: 561 return grpc_channel_check_connectivity_state( 562 self._state.c_channel, try_to_connect) 563 else: 564 raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason) 565 566 def watch_connectivity_state( 567 self, grpc_connectivity_state last_observed_state, object deadline): 568 return _watch_connectivity_state(self._state, last_observed_state, deadline) 569 570 def close(self, code, details): 571 _close(self, code, details, False) 572 573 def close_on_fork(self, code, details): 574 _close(self, code, details, True) 575 576 def get_registered_call_handle(self, method): 577 """ 578 Get or registers a call handler for a method. 579 580 This method is not thread-safe. 581 582 Args: 583 method: Required, the method name for the RPC. 584 585 Returns: 586 The registered call handle pointer in the form of a Python Long. 587 """ 588 if method not in self._registered_call_handles.keys(): 589 self._registered_call_handles[method] = CallHandle(self._state, method) 590 return self._registered_call_handles[method].call_handle 591