1# Copyright 2020 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import socket 16 17cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME) 18cdef float _POLL_AWAKE_INTERVAL_S = 0.2 19 20# This bool indicates if the event loop impl can monitor a given fd, or has 21# loop.add_reader method. 22cdef bint _has_fd_monitoring = True 23 24IF UNAME_SYSNAME == "Windows": 25 cdef void _unified_socket_write(int fd) nogil: 26 win_socket_send(<WIN_SOCKET>fd, b"1", 1, 0) 27ELSE: 28 from posix cimport unistd 29 30 cdef void _unified_socket_write(int fd) nogil: 31 unistd.write(fd, b"1", 1) 32 33 34def _handle_callback_wrapper(CallbackWrapper callback_wrapper, int success): 35 CallbackWrapper.functor_run(callback_wrapper.c_functor(), success) 36 37 38cdef class BaseCompletionQueue: 39 40 cdef grpc_completion_queue* c_ptr(self): 41 return self._cq 42 43 44cdef class _BoundEventLoop: 45 46 def __cinit__(self, object loop, object read_socket, object handler): 47 global _has_fd_monitoring 48 self.loop = loop 49 self.read_socket = read_socket 50 reader_function = functools.partial( 51 handler, 52 loop 53 ) 54 # NOTE(lidiz) There isn't a way to cleanly pre-check if fd monitoring 55 # support is available or not. Checking the event loop policy is not 56 # good enough. The application can has its own loop implementation, or 57 # uses different types of event loops (e.g., 1 Proactor, 3 Selectors). 58 if _has_fd_monitoring: 59 try: 60 self.loop.add_reader(self.read_socket, reader_function) 61 self._has_reader = True 62 except NotImplementedError: 63 _has_fd_monitoring = False 64 self._has_reader = False 65 66 def close(self): 67 if self.loop: 68 if self._has_reader: 69 self.loop.remove_reader(self.read_socket) 70 71 72cdef class PollerCompletionQueue(BaseCompletionQueue): 73 74 def __cinit__(self): 75 self._cq = grpc_completion_queue_create_for_next(NULL) 76 self._shutdown = False 77 self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True) 78 self._poller_thread.start() 79 80 self._read_socket, self._write_socket = socket.socketpair() 81 self._write_fd = self._write_socket.fileno() 82 self._loops = {} 83 84 # The read socket might be read by multiple threads. But only one of them will 85 # read the 1 byte sent by the poller thread. This setting is essential to allow 86 # multiple loops in multiple threads bound to the same poller. 87 self._read_socket.setblocking(False) 88 89 self._queue = cpp_event_queue() 90 91 def bind_loop(self, object loop): 92 if loop in self._loops: 93 return 94 else: 95 self._loops[loop] = _BoundEventLoop(loop, self._read_socket, self._handle_events) 96 97 cdef void _poll(self) nogil: 98 cdef grpc_event event 99 cdef CallbackContext *context 100 101 while not self._shutdown: 102 event = grpc_completion_queue_next(self._cq, 103 _GPR_INF_FUTURE, 104 NULL) 105 106 if event.type == GRPC_QUEUE_TIMEOUT: 107 with gil: 108 raise AssertionError("Core should not return GRPC_QUEUE_TIMEOUT!") 109 elif event.type == GRPC_QUEUE_SHUTDOWN: 110 self._shutdown = True 111 else: 112 self._queue_mutex.lock() 113 self._queue.push(event) 114 self._queue_mutex.unlock() 115 if _has_fd_monitoring: 116 _unified_socket_write(self._write_fd) 117 else: 118 with gil: 119 # Event loops can be paused or killed at any time. So, 120 # instead of deligate to any thread, the polling thread 121 # should handle the distribution of the event. 122 self._handle_events(None) 123 124 def _poll_wrapper(self): 125 with nogil: 126 self._poll() 127 128 cdef shutdown(self): 129 # Removes the socket hook from loops 130 for loop in self._loops: 131 self._loops.get(loop).close() 132 133 # TODO(https://github.com/grpc/grpc/issues/22365) perform graceful shutdown 134 grpc_completion_queue_shutdown(self._cq) 135 while not self._shutdown: 136 self._poller_thread.join(timeout=_POLL_AWAKE_INTERVAL_S) 137 grpc_completion_queue_destroy(self._cq) 138 139 # Clean up socket resources 140 self._read_socket.close() 141 self._write_socket.close() 142 143 def _handle_events(self, object context_loop): 144 cdef bytes data 145 if _has_fd_monitoring: 146 # If fd monitoring is working, clean the socket without blocking. 147 data = self._read_socket.recv(1) 148 cdef grpc_event event 149 cdef CallbackContext *context 150 151 while True: 152 self._queue_mutex.lock() 153 if self._queue.empty(): 154 self._queue_mutex.unlock() 155 break 156 else: 157 event = self._queue.front() 158 self._queue.pop() 159 self._queue_mutex.unlock() 160 161 context = <CallbackContext *>event.tag 162 loop = <object>context.loop 163 if loop is context_loop: 164 # Executes callbacks: complete the future 165 CallbackWrapper.functor_run( 166 <grpc_completion_queue_functor *>event.tag, 167 event.success 168 ) 169 else: 170 loop.call_soon_threadsafe( 171 _handle_callback_wrapper, 172 <CallbackWrapper>context.callback_wrapper, 173 event.success 174 ) 175