xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# distutils: language=c++
15
16from libc cimport string
17from cython.operator cimport dereference
18
19from cpython cimport Py_INCREF, Py_DECREF
20
21import atexit
22import errno
23import sys
24
25gevent_hub = None
26g_gevent_pool = None
27g_gevent_threadpool = None
28g_gevent_activated = False
29
30
31cdef queue[void*] g_greenlets_to_run
32cdef condition_variable g_greenlets_cv
33cdef mutex g_greenlets_mu
34cdef bint g_shutdown_greenlets_to_run_queue = False
35cdef int g_channel_count = 0
36
37
38cdef _submit_to_greenlet_queue(object cb, tuple args):
39  cdef tuple to_call = (cb,) + args
40  cdef unique_lock[mutex]* lk
41  Py_INCREF(to_call)
42  with nogil:
43    lk = new unique_lock[mutex](g_greenlets_mu)
44    g_greenlets_to_run.push(<void*>(to_call))
45    del lk
46    g_greenlets_cv.notify_all()
47
48
49cpdef void gevent_increment_channel_count():
50  global g_channel_count
51  cdef int old_channel_count
52  with nogil:
53    lk = new unique_lock[mutex](g_greenlets_mu)
54    old_channel_count = g_channel_count
55    g_channel_count += 1
56    del lk
57  if old_channel_count == 0:
58    run_spawn_greenlets()
59
60
61cpdef void gevent_decrement_channel_count():
62  global g_channel_count
63  with nogil:
64    lk = new unique_lock[mutex](g_greenlets_mu)
65    g_channel_count -= 1
66    if g_channel_count == 0:
67      g_greenlets_cv.notify_all()
68    del lk
69
70
71cdef object await_next_greenlet():
72  cdef unique_lock[mutex]* lk
73  with nogil:
74    # Cython doesn't allow us to do proper stack allocations, so we can't take
75    # advantage of RAII.
76    lk = new unique_lock[mutex](g_greenlets_mu)
77    while not g_shutdown_greenlets_to_run_queue and g_channel_count != 0:
78      if not g_greenlets_to_run.empty():
79        break
80      g_greenlets_cv.wait(dereference(lk))
81  if g_channel_count == 0:
82    del lk
83    return None
84  if g_shutdown_greenlets_to_run_queue:
85    del lk
86    return None
87  cdef object to_call = <object>g_greenlets_to_run.front()
88  Py_DECREF(to_call)
89  g_greenlets_to_run.pop()
90  del lk
91  return to_call
92
93def spawn_greenlets():
94  while True:
95    to_call = g_gevent_threadpool.apply(await_next_greenlet, ())
96    if to_call is None:
97      break
98    fn = to_call[0]
99    args = to_call[1:]
100    fn(*args)
101
102def run_spawn_greenlets():
103  g_gevent_pool.spawn(spawn_greenlets)
104
105def shutdown_await_next_greenlet():
106  global g_shutdown_greenlets_to_run_queue
107  cdef unique_lock[mutex]* lk
108  with nogil:
109    lk = new unique_lock[mutex](g_greenlets_mu)
110    g_shutdown_greenlets_to_run_queue = True
111  del lk
112  g_greenlets_cv.notify_all()
113
114def init_grpc_gevent():
115  # Lazily import gevent
116  global gevent_hub
117  global g_gevent_threadpool
118  global g_gevent_activated
119  global g_interrupt_check_period_ms
120  global g_gevent_pool
121
122  import gevent
123  import gevent.pool
124
125  gevent_hub = gevent.hub
126  g_gevent_threadpool = gevent_hub.get_hub().threadpool
127
128  g_gevent_activated = True
129  g_interrupt_check_period_ms = 2000
130
131  g_gevent_pool = gevent.pool.Group()
132
133
134  set_async_callback_func(_submit_to_greenlet_queue)
135
136  # TODO: Document how this all works.
137  atexit.register(shutdown_await_next_greenlet)
138