xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16import threading
17
18import grpc
19
20_NOT_YET_OBSERVED = object()
21logging.basicConfig()
22_LOGGER = logging.getLogger(__name__)
23
24
25def _cancel(handler):
26    return handler.cancel(grpc.StatusCode.CANCELLED, "Locally cancelled!")
27
28
29def _is_active(handler):
30    return handler.is_active()
31
32
33def _time_remaining(unused_handler):
34    raise NotImplementedError()
35
36
37def _add_callback(handler, callback):
38    return handler.add_callback(callback)
39
40
41def _initial_metadata(handler):
42    return handler.initial_metadata()
43
44
45def _trailing_metadata(handler):
46    trailing_metadata, unused_code, unused_details = handler.termination()
47    return trailing_metadata
48
49
50def _code(handler):
51    unused_trailing_metadata, code, unused_details = handler.termination()
52    return code
53
54
55def _details(handler):
56    unused_trailing_metadata, unused_code, details = handler.termination()
57    return details
58
59
60class _Call(grpc.Call):
61    def __init__(self, handler):
62        self._handler = handler
63
64    def cancel(self):
65        _cancel(self._handler)
66
67    def is_active(self):
68        return _is_active(self._handler)
69
70    def time_remaining(self):
71        return _time_remaining(self._handler)
72
73    def add_callback(self, callback):
74        return _add_callback(self._handler, callback)
75
76    def initial_metadata(self):
77        return _initial_metadata(self._handler)
78
79    def trailing_metadata(self):
80        return _trailing_metadata(self._handler)
81
82    def code(self):
83        return _code(self._handler)
84
85    def details(self):
86        return _details(self._handler)
87
88
89class _RpcErrorCall(grpc.RpcError, grpc.Call):
90    def __init__(self, handler):
91        self._handler = handler
92
93    def cancel(self):
94        _cancel(self._handler)
95
96    def is_active(self):
97        return _is_active(self._handler)
98
99    def time_remaining(self):
100        return _time_remaining(self._handler)
101
102    def add_callback(self, callback):
103        return _add_callback(self._handler, callback)
104
105    def initial_metadata(self):
106        return _initial_metadata(self._handler)
107
108    def trailing_metadata(self):
109        return _trailing_metadata(self._handler)
110
111    def code(self):
112        return _code(self._handler)
113
114    def details(self):
115        return _details(self._handler)
116
117
118def _next(handler):
119    read = handler.take_response()
120    if read.code is None:
121        return read.response
122    elif read.code is grpc.StatusCode.OK:
123        raise StopIteration()
124    else:
125        raise _RpcErrorCall(handler)
126
127
128class _HandlerExtras(object):
129    def __init__(self):
130        self.condition = threading.Condition()
131        self.unary_response = _NOT_YET_OBSERVED
132        self.cancelled = False
133
134
135def _with_extras_cancel(handler, extras):
136    with extras.condition:
137        if handler.cancel(grpc.StatusCode.CANCELLED, "Locally cancelled!"):
138            extras.cancelled = True
139            return True
140        else:
141            return False
142
143
144def _extras_without_cancelled(extras):
145    with extras.condition:
146        return extras.cancelled
147
148
149def _running(handler):
150    return handler.is_active()
151
152
153def _done(handler):
154    return not handler.is_active()
155
156
157def _with_extras_unary_response(handler, extras):
158    with extras.condition:
159        if extras.unary_response is _NOT_YET_OBSERVED:
160            read = handler.take_response()
161            if read.code is None:
162                extras.unary_response = read.response
163                return read.response
164            else:
165                raise _RpcErrorCall(handler)
166        else:
167            return extras.unary_response
168
169
170def _exception(unused_handler):
171    raise NotImplementedError("TODO!")
172
173
174def _traceback(unused_handler):
175    raise NotImplementedError("TODO!")
176
177
178def _add_done_callback(handler, callback, future):
179    adapted_callback = lambda: callback(future)
180    if not handler.add_callback(adapted_callback):
181        callback(future)
182
183
184class _FutureCall(grpc.Future, grpc.Call):
185    def __init__(self, handler, extras):
186        self._handler = handler
187        self._extras = extras
188
189    def cancel(self):
190        return _with_extras_cancel(self._handler, self._extras)
191
192    def cancelled(self):
193        return _extras_without_cancelled(self._extras)
194
195    def running(self):
196        return _running(self._handler)
197
198    def done(self):
199        return _done(self._handler)
200
201    def result(self):
202        return _with_extras_unary_response(self._handler, self._extras)
203
204    def exception(self):
205        return _exception(self._handler)
206
207    def traceback(self):
208        return _traceback(self._handler)
209
210    def add_done_callback(self, fn):
211        _add_done_callback(self._handler, fn, self)
212
213    def is_active(self):
214        return _is_active(self._handler)
215
216    def time_remaining(self):
217        return _time_remaining(self._handler)
218
219    def add_callback(self, callback):
220        return _add_callback(self._handler, callback)
221
222    def initial_metadata(self):
223        return _initial_metadata(self._handler)
224
225    def trailing_metadata(self):
226        return _trailing_metadata(self._handler)
227
228    def code(self):
229        return _code(self._handler)
230
231    def details(self):
232        return _details(self._handler)
233
234
235def consume_requests(request_iterator, handler):
236    def _consume():
237        while True:
238            try:
239                request = next(request_iterator)
240                added = handler.add_request(request)
241                if not added:
242                    break
243            except StopIteration:
244                handler.close_requests()
245                break
246            except Exception:  # pylint: disable=broad-except
247                details = "Exception iterating requests!"
248                _LOGGER.exception(details)
249                handler.cancel(grpc.StatusCode.UNKNOWN, details)
250
251    consumption = threading.Thread(target=_consume)
252    consumption.start()
253
254
255def blocking_unary_response(handler):
256    read = handler.take_response()
257    if read.code is None:
258        unused_trailing_metadata, code, unused_details = handler.termination()
259        if code is grpc.StatusCode.OK:
260            return read.response
261        else:
262            raise _RpcErrorCall(handler)
263    else:
264        raise _RpcErrorCall(handler)
265
266
267def blocking_unary_response_with_call(handler):
268    read = handler.take_response()
269    if read.code is None:
270        unused_trailing_metadata, code, unused_details = handler.termination()
271        if code is grpc.StatusCode.OK:
272            return read.response, _Call(handler)
273        else:
274            raise _RpcErrorCall(handler)
275    else:
276        raise _RpcErrorCall(handler)
277
278
279def future_call(handler):
280    return _FutureCall(handler, _HandlerExtras())
281
282
283class ResponseIteratorCall(grpc.Call):
284    def __init__(self, handler):
285        self._handler = handler
286
287    def __iter__(self):
288        return self
289
290    def __next__(self):
291        return _next(self._handler)
292
293    def next(self):
294        return _next(self._handler)
295
296    def cancel(self):
297        _cancel(self._handler)
298
299    def is_active(self):
300        return _is_active(self._handler)
301
302    def time_remaining(self):
303        return _time_remaining(self._handler)
304
305    def add_callback(self, callback):
306        return _add_callback(self._handler, callback)
307
308    def initial_metadata(self):
309        return _initial_metadata(self._handler)
310
311    def trailing_metadata(self):
312        return _trailing_metadata(self._handler)
313
314    def code(self):
315        return _code(self._handler)
316
317    def details(self):
318        return _details(self._handler)
319