xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from grpc import _observability
16
17_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
18    'Internal gRPC call error %d. ' +
19    'Please report to https://github.com/grpc/grpc/issues')
20
21
22cdef str _call_error_metadata(metadata):
23  return 'metadata was invalid: %s' % metadata
24
25
26cdef str _call_error_no_metadata(c_call_error):
27  return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
28
29
30cdef str _call_error(c_call_error, metadata):
31  if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
32    return _call_error_metadata(metadata)
33  else:
34    return _call_error_no_metadata(c_call_error)
35
36
37cdef _check_call_error_no_metadata(c_call_error):
38  if c_call_error != GRPC_CALL_OK:
39    return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
40  else:
41    return None
42
43
44cdef _check_and_raise_call_error_no_metadata(c_call_error):
45  error = _check_call_error_no_metadata(c_call_error)
46  if error is not None:
47    raise ValueError(error)
48
49
50cdef _check_call_error(c_call_error, metadata):
51  if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
52    return _call_error_metadata(metadata)
53  else:
54    return _check_call_error_no_metadata(c_call_error)
55
56
57cdef void _raise_call_error_no_metadata(c_call_error) except *:
58  raise ValueError(_call_error_no_metadata(c_call_error))
59
60
61cdef void _raise_call_error(c_call_error, metadata) except *:
62  raise ValueError(_call_error(c_call_error, metadata))
63
64
65cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue):
66  grpc_completion_queue_shutdown(c_completion_queue)
67  grpc_completion_queue_destroy(c_completion_queue)
68
69
70cdef class _CallState:
71
72  def __cinit__(self):
73    self.due = set()
74
75  cdef void maybe_delete_call_tracer(self) except *:
76    if not self.call_tracer_capsule:
77      return
78    _observability.delete_call_tracer(self.call_tracer_capsule)
79
80  cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *:
81    # TODO(xuanwn): use channel args to exclude those metrics.
82    for exclude_prefix in _observability._SERVICES_TO_EXCLUDE:
83      if exclude_prefix in method_name:
84        return
85    with _observability.get_plugin() as plugin:
86      if not (plugin and plugin.observability_enabled):
87        return
88      capsule = plugin.create_client_call_tracer(method_name, target)
89      capsule_ptr = cpython.PyCapsule_GetPointer(capsule, CLIENT_CALL_TRACER)
90      _set_call_tracer(self.c_call, capsule_ptr)
91      self.call_tracer_capsule = capsule
92
93cdef class _ChannelState:
94
95  def __cinit__(self, target):
96    self.target = target
97    self.condition = threading.Condition()
98    self.open = True
99    self.integrated_call_states = {}
100    self.segregated_call_states = set()
101    self.connectivity_due = set()
102    self.closed_reason = None
103
104cdef class CallHandle:
105
106  def __cinit__(self, _ChannelState channel_state, object method):
107    self.method = method
108    cpython.Py_INCREF(method)
109    # Note that since we always pass None for host, we set the
110    # second-to-last parameter of grpc_channel_register_call to a fixed
111    # NULL value.
112    self.c_call_handle = grpc_channel_register_call(
113      channel_state.c_channel, <const char *>method, NULL, NULL)
114
115  def __dealloc__(self):
116    cpython.Py_DECREF(self.method)
117
118  @property
119  def call_handle(self):
120    return cpython.PyLong_FromVoidPtr(self.c_call_handle)
121
122
123
124cdef tuple _operate(grpc_call *c_call, object operations, object user_tag):
125  cdef grpc_call_error c_call_error
126  cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None)
127  tag.prepare()
128  cpython.Py_INCREF(tag)
129  with nogil:
130    c_call_error = grpc_call_start_batch(
131        c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL)
132  return c_call_error, tag
133
134
135cdef object _operate_from_integrated_call(
136    _ChannelState channel_state, _CallState call_state, object operations,
137    object user_tag):
138  cdef grpc_call_error c_call_error
139  cdef _BatchOperationTag tag
140  with channel_state.condition:
141    if call_state.due:
142      c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
143      if c_call_error == GRPC_CALL_OK:
144        call_state.due.add(tag)
145        channel_state.integrated_call_states[tag] = call_state
146        return True
147      else:
148        _raise_call_error_no_metadata(c_call_error)
149    else:
150      return False
151
152
153cdef object _operate_from_segregated_call(
154    _ChannelState channel_state, _CallState call_state, object operations,
155    object user_tag):
156  cdef grpc_call_error c_call_error
157  cdef _BatchOperationTag tag
158  with channel_state.condition:
159    if call_state.due:
160      c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
161      if c_call_error == GRPC_CALL_OK:
162        call_state.due.add(tag)
163        return True
164      else:
165        _raise_call_error_no_metadata(c_call_error)
166    else:
167      return False
168
169
170cdef _cancel(
171    _ChannelState channel_state, _CallState call_state, grpc_status_code code,
172    str details):
173  cdef grpc_call_error c_call_error
174  with channel_state.condition:
175    if call_state.due:
176      c_call_error = grpc_call_cancel_with_status(
177          call_state.c_call, code, _encode(details), NULL)
178      _check_and_raise_call_error_no_metadata(c_call_error)
179
180
181cdef _next_call_event(
182    _ChannelState channel_state, grpc_completion_queue *c_completion_queue,
183    on_success, on_failure, deadline):
184  """Block on the next event out of the completion queue.
185
186  On success, `on_success` will be invoked with the tag taken from the CQ.
187  In the case of a failure due to an exception raised in a signal handler,
188  `on_failure` will be invoked with no arguments. Note that this situation
189  can only occur on the main thread.
190
191  Args:
192    channel_state: The state for the channel on which the RPC is running.
193    c_completion_queue: The CQ which will be polled.
194    on_success: A callable object to be invoked upon successful receipt of a
195      tag from the CQ.
196    on_failure: A callable object to be invoked in case a Python exception is
197      raised from a signal handler during polling.
198    deadline: The point after which the RPC will time out.
199  """
200  try:
201    tag, event = _latent_event(c_completion_queue, deadline)
202  # NOTE(rbellevi): This broad except enables us to clean up resources before
203  # propagating any exceptions raised by signal handlers to the application.
204  except:
205    if on_failure is not None:
206      on_failure()
207    raise
208  else:
209    with channel_state.condition:
210      on_success(tag)
211      channel_state.condition.notify_all()
212    return event
213
214
215# TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler.
216cdef void _call(
217    _ChannelState channel_state, _CallState call_state,
218    grpc_completion_queue *c_completion_queue, on_success, int flags, method,
219    host, object deadline, CallCredentials credentials,
220    object operationses_and_user_tags, object metadata,
221    object context, object registered_call_handle) except *:
222  """Invokes an RPC.
223
224  Args:
225    channel_state: A _ChannelState with its "open" attribute set to True. RPCs
226      may not be invoked on a closed channel.
227    call_state: An empty _CallState to be altered (specifically assigned a
228      c_call and having its due set populated) if the RPC invocation is
229      successful.
230    c_completion_queue: A grpc_completion_queue to be used for the call's
231      operations.
232    on_success: A behavior to be called if attempting to start operations for
233      the call succeeds. If called the behavior will be called while holding the
234      channel_state condition and passed the tags associated with operations
235      that were successfully started for the call.
236    flags: Flags to be passed to gRPC Core as part of call creation.
237    method: The fully-qualified name of the RPC method being invoked.
238    host: A "host" string to be passed to gRPC Core as part of call creation.
239    deadline: A float for the deadline of the RPC, or None if the RPC is to have
240      no deadline.
241    credentials: A _CallCredentials for the RPC or None.
242    operationses_and_user_tags: A sequence of length-two sequences the first
243      element of which is a sequence of Operations and the second element of
244      which is an object to be used as a tag. A SendInitialMetadataOperation
245      must be present in the first element of this value.
246    metadata: The metadata for this call.
247    context: Context object for distributed tracing.
248    registered_call_handle: An int representing the call handle of the method, or
249      None if the method is not registered.
250  """
251  cdef grpc_slice method_slice
252  cdef grpc_slice host_slice
253  cdef grpc_slice *host_slice_ptr
254  cdef grpc_call_credentials *c_call_credentials
255  cdef grpc_call_error c_call_error
256  cdef tuple error_and_wrapper_tag
257  cdef _BatchOperationTag wrapper_tag
258  with channel_state.condition:
259    if channel_state.open:
260      method_slice = _slice_from_bytes(method)
261      if host is None:
262        host_slice_ptr = NULL
263      else:
264        host_slice = _slice_from_bytes(host)
265        host_slice_ptr = &host_slice
266      if registered_call_handle:
267        call_state.c_call = grpc_channel_create_registered_call(
268            channel_state.c_channel, NULL, flags,
269            c_completion_queue, cpython.PyLong_AsVoidPtr(registered_call_handle),
270            _timespec_from_time(deadline), NULL)
271      else:
272        call_state.c_call = grpc_channel_create_call(
273            channel_state.c_channel, NULL, flags,
274            c_completion_queue, method_slice, host_slice_ptr,
275            _timespec_from_time(deadline), NULL)
276      grpc_slice_unref(method_slice)
277      if host_slice_ptr:
278        grpc_slice_unref(host_slice)
279      call_state.maybe_set_client_call_tracer_on_call(method, channel_state.target)
280      if context is not None:
281        set_census_context_on_call(call_state, context)
282      if credentials is not None:
283        c_call_credentials = credentials.c()
284        c_call_error = grpc_call_set_credentials(
285            call_state.c_call, c_call_credentials)
286        grpc_call_credentials_release(c_call_credentials)
287        if c_call_error != GRPC_CALL_OK:
288          #TODO(xuanwn): Expand the scope of nogil
289          with nogil:
290            grpc_call_unref(call_state.c_call)
291          call_state.c_call = NULL
292          _raise_call_error_no_metadata(c_call_error)
293      started_tags = set()
294      for operations, user_tag in operationses_and_user_tags:
295        c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
296        if c_call_error == GRPC_CALL_OK:
297          started_tags.add(tag)
298        else:
299          grpc_call_cancel(call_state.c_call, NULL)
300          #TODO(xuanwn): Expand the scope of nogil
301          with nogil:
302            grpc_call_unref(call_state.c_call)
303          call_state.c_call = NULL
304          _raise_call_error(c_call_error, metadata)
305      else:
306        call_state.due.update(started_tags)
307        on_success(started_tags)
308    else:
309      raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason)
310
311
312cdef void _process_integrated_call_tag(
313    _ChannelState state, _BatchOperationTag tag) except *:
314  cdef _CallState call_state = state.integrated_call_states.pop(tag)
315  call_state.due.remove(tag)
316  if not call_state.due:
317    with nogil:
318      grpc_call_unref(call_state.c_call)
319    call_state.c_call = NULL
320    call_state.maybe_delete_call_tracer()
321
322cdef class IntegratedCall:
323
324  def __cinit__(self, _ChannelState channel_state, _CallState call_state):
325    self._channel_state = channel_state
326    self._call_state = call_state
327
328  def operate(self, operations, tag):
329    return _operate_from_integrated_call(
330        self._channel_state, self._call_state, operations, tag)
331
332  def cancel(self, code, details):
333    _cancel(self._channel_state, self._call_state, code, details)
334
335
336cdef IntegratedCall _integrated_call(
337    _ChannelState state, int flags, method, host, object deadline,
338    object metadata, CallCredentials credentials, operationses_and_user_tags,
339    object context, object registered_call_handle):
340  call_state = _CallState()
341
342  def on_success(started_tags):
343    for started_tag in started_tags:
344      state.integrated_call_states[started_tag] = call_state
345
346  _call(
347      state, call_state, state.c_call_completion_queue, on_success, flags,
348      method, host, deadline, credentials, operationses_and_user_tags,
349      metadata, context, registered_call_handle)
350
351  return IntegratedCall(state, call_state)
352
353
354cdef object _process_segregated_call_tag(
355    _ChannelState state, _CallState call_state,
356    grpc_completion_queue *c_completion_queue, _BatchOperationTag tag):
357  call_state.due.remove(tag)
358  if not call_state.due:
359    #TODO(xuanwn): Expand the scope of nogil
360    with nogil:
361      grpc_call_unref(call_state.c_call)
362    call_state.c_call = NULL
363    call_state.maybe_delete_call_tracer()
364    state.segregated_call_states.remove(call_state)
365    _destroy_c_completion_queue(c_completion_queue)
366    return True
367  else:
368    return False
369
370
371cdef class SegregatedCall:
372
373  def __cinit__(self, _ChannelState channel_state, _CallState call_state):
374    self._channel_state = channel_state
375    self._call_state = call_state
376
377  def operate(self, operations, tag):
378    return _operate_from_segregated_call(
379        self._channel_state, self._call_state, operations, tag)
380
381  def cancel(self, code, details):
382    _cancel(self._channel_state, self._call_state, code, details)
383
384  def next_event(self):
385    def on_success(tag):
386      _process_segregated_call_tag(
387        self._channel_state, self._call_state, self._c_completion_queue, tag)
388    def on_failure():
389      self._call_state.due.clear()
390      with nogil:
391        grpc_call_unref(self._call_state.c_call)
392      self._call_state.c_call = NULL
393      self._channel_state.segregated_call_states.remove(self._call_state)
394      _destroy_c_completion_queue(self._c_completion_queue)
395    return _next_call_event(
396        self._channel_state, self._c_completion_queue, on_success, on_failure, None)
397
398
399cdef SegregatedCall _segregated_call(
400    _ChannelState state, int flags, method, host, object deadline,
401    object metadata, CallCredentials credentials, operationses_and_user_tags,
402    object context, object registered_call_handle):
403  cdef _CallState call_state = _CallState()
404  cdef SegregatedCall segregated_call
405  cdef grpc_completion_queue *c_completion_queue
406
407  def on_success(started_tags):
408    state.segregated_call_states.add(call_state)
409
410  with state.condition:
411    if state.open:
412      c_completion_queue = (grpc_completion_queue_create_for_next(NULL))
413    else:
414      raise ValueError('Cannot invoke RPC on closed channel!')
415
416  try:
417    _call(
418        state, call_state, c_completion_queue, on_success, flags, method, host,
419        deadline, credentials, operationses_and_user_tags, metadata,
420        context, registered_call_handle)
421  except:
422    _destroy_c_completion_queue(c_completion_queue)
423    raise
424
425  segregated_call = SegregatedCall(state, call_state)
426  segregated_call._c_completion_queue = c_completion_queue
427  return segregated_call
428
429
430cdef object _watch_connectivity_state(
431    _ChannelState state, grpc_connectivity_state last_observed_state,
432    object deadline):
433  cdef _ConnectivityTag tag = _ConnectivityTag(object())
434  with state.condition:
435    if state.open:
436      cpython.Py_INCREF(tag)
437      grpc_channel_watch_connectivity_state(
438          state.c_channel, last_observed_state, _timespec_from_time(deadline),
439          state.c_connectivity_completion_queue, <cpython.PyObject *>tag)
440      state.connectivity_due.add(tag)
441    else:
442      raise ValueError('Cannot monitor channel state: %s' % state.closed_reason)
443  completed_tag, event = _latent_event(
444      state.c_connectivity_completion_queue, None)
445  with state.condition:
446    state.connectivity_due.remove(completed_tag)
447    state.condition.notify_all()
448  return event
449
450
451cdef _close(Channel channel, grpc_status_code code, object details,
452    drain_calls):
453  cdef _ChannelState state = channel._state
454  cdef _CallState call_state
455  encoded_details = _encode(details)
456  with state.condition:
457    if state.open:
458      state.open = False
459      state.closed_reason = details
460      for call_state in set(state.integrated_call_states.values()):
461        grpc_call_cancel_with_status(
462            call_state.c_call, code, encoded_details, NULL)
463      for call_state in state.segregated_call_states:
464        grpc_call_cancel_with_status(
465            call_state.c_call, code, encoded_details, NULL)
466      # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity
467      # watching.
468
469      if drain_calls:
470        while not _calls_drained(state):
471          event = channel.next_call_event()
472          if event.completion_type == CompletionType.queue_timeout:
473              continue
474          event.tag(event)
475      else:
476        while state.integrated_call_states:
477          state.condition.wait()
478        while state.connectivity_due:
479          state.condition.wait()
480
481      _destroy_c_completion_queue(state.c_call_completion_queue)
482      _destroy_c_completion_queue(state.c_connectivity_completion_queue)
483      grpc_channel_destroy(state.c_channel)
484      state.c_channel = NULL
485      grpc_shutdown()
486      state.condition.notify_all()
487    else:
488      # Another call to close already completed in the past or is currently
489      # being executed in another thread.
490      while state.c_channel != NULL:
491        state.condition.wait()
492
493
494cdef _calls_drained(_ChannelState state):
495  return not (state.integrated_call_states or state.segregated_call_states or
496              state.connectivity_due)
497
498cdef class Channel:
499
500  def __cinit__(
501      self, bytes target, object arguments,
502      ChannelCredentials channel_credentials):
503    arguments = () if arguments is None else tuple(arguments)
504    fork_handlers_and_grpc_init()
505    self._state = _ChannelState(target)
506    self._state.c_call_completion_queue = (
507        grpc_completion_queue_create_for_next(NULL))
508    self._state.c_connectivity_completion_queue = (
509        grpc_completion_queue_create_for_next(NULL))
510    self._arguments = arguments
511    cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
512    c_channel_credentials = (
513        channel_credentials.c() if channel_credentials is not None
514        else grpc_insecure_credentials_create())
515    self._state.c_channel = grpc_channel_create(
516        <char *>target, c_channel_credentials, channel_args.c_args())
517    self._registered_call_handles = {}
518    grpc_channel_credentials_release(c_channel_credentials)
519
520  def target(self):
521    cdef char *c_target
522    with self._state.condition:
523      c_target = grpc_channel_get_target(self._state.c_channel)
524      target = <bytes>c_target
525      gpr_free(c_target)
526      return target
527
528  def integrated_call(
529      self, int flags, method, host, object deadline, object metadata,
530      CallCredentials credentials, operationses_and_tags,
531      object context = None, object registered_call_handle = None):
532    return _integrated_call(
533        self._state, flags, method, host, deadline, metadata, credentials,
534        operationses_and_tags, context, registered_call_handle)
535
536  def next_call_event(self):
537    def on_success(tag):
538      if tag is not None:
539        _process_integrated_call_tag(self._state, tag)
540    if is_fork_support_enabled():
541      queue_deadline = time.time() + 1.0
542    else:
543      queue_deadline = None
544    # NOTE(gnossen): It is acceptable for on_failure to be None here because
545    # failure conditions can only ever happen on the main thread and this
546    # method is only ever invoked on the channel spin thread.
547    return _next_call_event(self._state, self._state.c_call_completion_queue,
548                            on_success, None, queue_deadline)
549
550  def segregated_call(
551      self, int flags, method, host, object deadline, object metadata,
552      CallCredentials credentials, operationses_and_tags,
553      object context = None, object registered_call_handle = None):
554    return _segregated_call(
555        self._state, flags, method, host, deadline, metadata, credentials,
556        operationses_and_tags, context, registered_call_handle)
557
558  def check_connectivity_state(self, bint try_to_connect):
559    with self._state.condition:
560      if self._state.open:
561        return grpc_channel_check_connectivity_state(
562            self._state.c_channel, try_to_connect)
563      else:
564        raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason)
565
566  def watch_connectivity_state(
567      self, grpc_connectivity_state last_observed_state, object deadline):
568    return _watch_connectivity_state(self._state, last_observed_state, deadline)
569
570  def close(self, code, details):
571    _close(self, code, details, False)
572
573  def close_on_fork(self, code, details):
574    _close(self, code, details, True)
575
576  def get_registered_call_handle(self, method):
577    """
578    Get or registers a call handler for a method.
579
580    This method is not thread-safe.
581
582    Args:
583      method: Required, the method name for the RPC.
584
585    Returns:
586      The registered call handle pointer in the form of a Python Long.
587    """
588    if method not in self._registered_call_handles.keys():
589      self._registered_call_handles[method] = CallHandle(self._state, method)
590    return self._registered_call_handles[method].call_handle
591