1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# distutils: language=c++ 15 16from libc cimport string 17from cython.operator cimport dereference 18 19from cpython cimport Py_INCREF, Py_DECREF 20 21import atexit 22import errno 23import sys 24 25gevent_hub = None 26g_gevent_pool = None 27g_gevent_threadpool = None 28g_gevent_activated = False 29 30 31cdef queue[void*] g_greenlets_to_run 32cdef condition_variable g_greenlets_cv 33cdef mutex g_greenlets_mu 34cdef bint g_shutdown_greenlets_to_run_queue = False 35cdef int g_channel_count = 0 36 37 38cdef _submit_to_greenlet_queue(object cb, tuple args): 39 cdef tuple to_call = (cb,) + args 40 cdef unique_lock[mutex]* lk 41 Py_INCREF(to_call) 42 with nogil: 43 lk = new unique_lock[mutex](g_greenlets_mu) 44 g_greenlets_to_run.push(<void*>(to_call)) 45 del lk 46 g_greenlets_cv.notify_all() 47 48 49cpdef void gevent_increment_channel_count(): 50 global g_channel_count 51 cdef int old_channel_count 52 with nogil: 53 lk = new unique_lock[mutex](g_greenlets_mu) 54 old_channel_count = g_channel_count 55 g_channel_count += 1 56 del lk 57 if old_channel_count == 0: 58 run_spawn_greenlets() 59 60 61cpdef void gevent_decrement_channel_count(): 62 global g_channel_count 63 with nogil: 64 lk = new unique_lock[mutex](g_greenlets_mu) 65 g_channel_count -= 1 66 if g_channel_count == 0: 67 g_greenlets_cv.notify_all() 68 del lk 69 70 71cdef object await_next_greenlet(): 72 cdef unique_lock[mutex]* lk 73 with nogil: 74 # Cython doesn't allow us to do proper stack allocations, so we can't take 75 # advantage of RAII. 76 lk = new unique_lock[mutex](g_greenlets_mu) 77 while not g_shutdown_greenlets_to_run_queue and g_channel_count != 0: 78 if not g_greenlets_to_run.empty(): 79 break 80 g_greenlets_cv.wait(dereference(lk)) 81 if g_channel_count == 0: 82 del lk 83 return None 84 if g_shutdown_greenlets_to_run_queue: 85 del lk 86 return None 87 cdef object to_call = <object>g_greenlets_to_run.front() 88 Py_DECREF(to_call) 89 g_greenlets_to_run.pop() 90 del lk 91 return to_call 92 93def spawn_greenlets(): 94 while True: 95 to_call = g_gevent_threadpool.apply(await_next_greenlet, ()) 96 if to_call is None: 97 break 98 fn = to_call[0] 99 args = to_call[1:] 100 fn(*args) 101 102def run_spawn_greenlets(): 103 g_gevent_pool.spawn(spawn_greenlets) 104 105def shutdown_await_next_greenlet(): 106 global g_shutdown_greenlets_to_run_queue 107 cdef unique_lock[mutex]* lk 108 with nogil: 109 lk = new unique_lock[mutex](g_greenlets_mu) 110 g_shutdown_greenlets_to_run_queue = True 111 del lk 112 g_greenlets_cv.notify_all() 113 114def init_grpc_gevent(): 115 # Lazily import gevent 116 global gevent_hub 117 global g_gevent_threadpool 118 global g_gevent_activated 119 global g_interrupt_check_period_ms 120 global g_gevent_pool 121 122 import gevent 123 import gevent.pool 124 125 gevent_hub = gevent.hub 126 g_gevent_threadpool = gevent_hub.get_hub().threadpool 127 128 g_gevent_activated = True 129 g_interrupt_check_period_ms = 2000 130 131 g_gevent_pool = gevent.pool.Group() 132 133 134 set_async_callback_func(_submit_to_greenlet_queue) 135 136 # TODO: Document how this all works. 137 atexit.register(shutdown_await_next_greenlet) 138