xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2020 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import socket
16
17cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME)
18cdef float _POLL_AWAKE_INTERVAL_S = 0.2
19
20# This bool indicates if the event loop impl can monitor a given fd, or has
21# loop.add_reader method.
22cdef bint _has_fd_monitoring = True
23
24IF UNAME_SYSNAME == "Windows":
25    cdef void _unified_socket_write(int fd) nogil:
26        win_socket_send(<WIN_SOCKET>fd, b"1", 1, 0)
27ELSE:
28    from posix cimport unistd
29
30    cdef void _unified_socket_write(int fd) nogil:
31        unistd.write(fd, b"1", 1)
32
33
34def _handle_callback_wrapper(CallbackWrapper callback_wrapper, int success):
35    CallbackWrapper.functor_run(callback_wrapper.c_functor(), success)
36
37
38cdef class BaseCompletionQueue:
39
40    cdef grpc_completion_queue* c_ptr(self):
41        return self._cq
42
43
44cdef class _BoundEventLoop:
45
46    def __cinit__(self, object loop, object read_socket, object handler):
47        global _has_fd_monitoring
48        self.loop = loop
49        self.read_socket = read_socket
50        reader_function = functools.partial(
51            handler,
52            loop
53        )
54        # NOTE(lidiz) There isn't a way to cleanly pre-check if fd monitoring
55        # support is available or not. Checking the event loop policy is not
56        # good enough. The application can has its own loop implementation, or
57        # uses different types of event loops (e.g., 1 Proactor, 3 Selectors).
58        if _has_fd_monitoring:
59            try:
60                self.loop.add_reader(self.read_socket, reader_function)
61                self._has_reader = True
62            except NotImplementedError:
63                _has_fd_monitoring = False
64                self._has_reader = False
65
66    def close(self):
67        if self.loop:
68            if self._has_reader:
69                self.loop.remove_reader(self.read_socket)
70
71
72cdef class PollerCompletionQueue(BaseCompletionQueue):
73
74    def __cinit__(self):
75        self._cq = grpc_completion_queue_create_for_next(NULL)
76        self._shutdown = False
77        self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True)
78        self._poller_thread.start()
79
80        self._read_socket, self._write_socket = socket.socketpair()
81        self._write_fd = self._write_socket.fileno()
82        self._loops = {}
83
84        # The read socket might be read by multiple threads. But only one of them will
85        # read the 1 byte sent by the poller thread. This setting is essential to allow
86        # multiple loops in multiple threads bound to the same poller.
87        self._read_socket.setblocking(False)
88
89        self._queue = cpp_event_queue()
90
91    def bind_loop(self, object loop):
92        if loop in self._loops:
93            return
94        else:
95            self._loops[loop] = _BoundEventLoop(loop, self._read_socket, self._handle_events)
96
97    cdef void _poll(self) nogil:
98        cdef grpc_event event
99        cdef CallbackContext *context
100
101        while not self._shutdown:
102            event = grpc_completion_queue_next(self._cq,
103                                               _GPR_INF_FUTURE,
104                                               NULL)
105
106            if event.type == GRPC_QUEUE_TIMEOUT:
107                with gil:
108                    raise AssertionError("Core should not return GRPC_QUEUE_TIMEOUT!")
109            elif event.type == GRPC_QUEUE_SHUTDOWN:
110                self._shutdown = True
111            else:
112                self._queue_mutex.lock()
113                self._queue.push(event)
114                self._queue_mutex.unlock()
115                if _has_fd_monitoring:
116                    _unified_socket_write(self._write_fd)
117                else:
118                    with gil:
119                        # Event loops can be paused or killed at any time. So,
120                        # instead of deligate to any thread, the polling thread
121                        # should handle the distribution of the event.
122                        self._handle_events(None)
123
124    def _poll_wrapper(self):
125        with nogil:
126            self._poll()
127
128    cdef shutdown(self):
129        # Removes the socket hook from loops
130        for loop in self._loops:
131            self._loops.get(loop).close()
132
133        # TODO(https://github.com/grpc/grpc/issues/22365) perform graceful shutdown
134        grpc_completion_queue_shutdown(self._cq)
135        while not self._shutdown:
136            self._poller_thread.join(timeout=_POLL_AWAKE_INTERVAL_S)
137        grpc_completion_queue_destroy(self._cq)
138
139        # Clean up socket resources
140        self._read_socket.close()
141        self._write_socket.close()
142
143    def _handle_events(self, object context_loop):
144        cdef bytes data
145        if _has_fd_monitoring:
146            # If fd monitoring is working, clean the socket without blocking.
147            data = self._read_socket.recv(1)
148        cdef grpc_event event
149        cdef CallbackContext *context
150
151        while True:
152            self._queue_mutex.lock()
153            if self._queue.empty():
154                self._queue_mutex.unlock()
155                break
156            else:
157                event = self._queue.front()
158                self._queue.pop()
159                self._queue_mutex.unlock()
160
161            context = <CallbackContext *>event.tag
162            loop = <object>context.loop
163            if loop is context_loop:
164                # Executes callbacks: complete the future
165                CallbackWrapper.functor_run(
166                    <grpc_completion_queue_functor *>event.tag,
167                    event.success
168                )
169            else:
170                loop.call_soon_threadsafe(
171                    _handle_callback_wrapper,
172                    <CallbackWrapper>context.callback_wrapper,
173                    event.success
174                )
175