xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_testing/grpc_testing/_server/_handler.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import abc
16import threading
17
18import grpc
19from grpc_testing import _common
20
21_CLIENT_INACTIVE = object()
22
23
24class Handler(_common.ServerRpcHandler):
25    @abc.abstractmethod
26    def initial_metadata(self):
27        raise NotImplementedError()
28
29    @abc.abstractmethod
30    def add_request(self, request):
31        raise NotImplementedError()
32
33    @abc.abstractmethod
34    def take_response(self):
35        raise NotImplementedError()
36
37    @abc.abstractmethod
38    def requests_closed(self):
39        raise NotImplementedError()
40
41    @abc.abstractmethod
42    def cancel(self):
43        raise NotImplementedError()
44
45    @abc.abstractmethod
46    def unary_response_termination(self):
47        raise NotImplementedError()
48
49    @abc.abstractmethod
50    def stream_response_termination(self):
51        raise NotImplementedError()
52
53
54class _Handler(Handler):
55    def __init__(self, requests_closed):
56        self._condition = threading.Condition()
57        self._requests = []
58        self._requests_closed = requests_closed
59        self._initial_metadata = None
60        self._responses = []
61        self._trailing_metadata = None
62        self._code = None
63        self._details = None
64        self._unary_response = None
65        self._expiration_future = None
66        self._termination_callbacks = []
67
68    def send_initial_metadata(self, initial_metadata):
69        with self._condition:
70            self._initial_metadata = initial_metadata
71            self._condition.notify_all()
72
73    def take_request(self):
74        with self._condition:
75            while True:
76                if self._code is None:
77                    if self._requests:
78                        request = self._requests.pop(0)
79                        self._condition.notify_all()
80                        return _common.ServerRpcRead(request, False, False)
81                    elif self._requests_closed:
82                        return _common.REQUESTS_CLOSED
83                    else:
84                        self._condition.wait()
85                else:
86                    return _common.TERMINATED
87
88    def is_active(self):
89        with self._condition:
90            return self._code is None
91
92    def add_response(self, response):
93        with self._condition:
94            self._responses.append(response)
95            self._condition.notify_all()
96
97    def send_termination(self, trailing_metadata, code, details):
98        with self._condition:
99            self._trailing_metadata = trailing_metadata
100            self._code = code
101            self._details = details
102            if self._expiration_future is not None:
103                self._expiration_future.cancel()
104            self._condition.notify_all()
105
106    def add_termination_callback(self, callback):
107        with self._condition:
108            if self._code is None:
109                self._termination_callbacks.append(callback)
110                return True
111            else:
112                return False
113
114    def initial_metadata(self):
115        with self._condition:
116            while True:
117                if self._initial_metadata is None:
118                    if self._code is None:
119                        self._condition.wait()
120                    else:
121                        raise ValueError(
122                            "No initial metadata despite status code!"
123                        )
124                else:
125                    return self._initial_metadata
126
127    def add_request(self, request):
128        with self._condition:
129            self._requests.append(request)
130            self._condition.notify_all()
131
132    def take_response(self):
133        with self._condition:
134            while True:
135                if self._responses:
136                    response = self._responses.pop(0)
137                    self._condition.notify_all()
138                    return response
139                elif self._code is None:
140                    self._condition.wait()
141                else:
142                    raise ValueError("No more responses!")
143
144    def requests_closed(self):
145        with self._condition:
146            self._requests_closed = True
147            self._condition.notify_all()
148
149    def cancel(self):
150        with self._condition:
151            if self._code is None:
152                self._code = _CLIENT_INACTIVE
153                termination_callbacks = self._termination_callbacks
154                self._termination_callbacks = None
155                if self._expiration_future is not None:
156                    self._expiration_future.cancel()
157                self._condition.notify_all()
158        for termination_callback in termination_callbacks:
159            termination_callback()
160
161    def unary_response_termination(self):
162        with self._condition:
163            while True:
164                if self._code is _CLIENT_INACTIVE:
165                    raise ValueError("Huh? Cancelled but wanting status?")
166                elif self._code is None:
167                    self._condition.wait()
168                else:
169                    if self._unary_response is None:
170                        if self._responses:
171                            self._unary_response = self._responses.pop(0)
172                    return (
173                        self._unary_response,
174                        self._trailing_metadata,
175                        self._code,
176                        self._details,
177                    )
178
179    def stream_response_termination(self):
180        with self._condition:
181            while True:
182                if self._code is _CLIENT_INACTIVE:
183                    raise ValueError("Huh? Cancelled but wanting status?")
184                elif self._code is None:
185                    self._condition.wait()
186                else:
187                    return self._trailing_metadata, self._code, self._details
188
189    def expire(self):
190        with self._condition:
191            if self._code is None:
192                if self._initial_metadata is None:
193                    self._initial_metadata = _common.FUSSED_EMPTY_METADATA
194                self._trailing_metadata = _common.FUSSED_EMPTY_METADATA
195                self._code = grpc.StatusCode.DEADLINE_EXCEEDED
196                self._details = "Took too much time!"
197                termination_callbacks = self._termination_callbacks
198                self._termination_callbacks = None
199                self._condition.notify_all()
200        for termination_callback in termination_callbacks:
201            termination_callback()
202
203    def set_expiration_future(self, expiration_future):
204        with self._condition:
205            self._expiration_future = expiration_future
206
207
208def handler_without_deadline(requests_closed):
209    return _Handler(requests_closed)
210
211
212def handler_with_deadline(requests_closed, time, deadline):
213    handler = _Handler(requests_closed)
214    expiration_future = time.call_at(handler.expire, deadline)
215    handler.set_expiration_future(expiration_future)
216    return handler
217