xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_testing/grpc_testing/_server/_server.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import threading
16
17import grpc_testing
18from grpc_testing import _common
19from grpc_testing._server import _handler
20from grpc_testing._server import _rpc
21from grpc_testing._server import _server_rpc
22from grpc_testing._server import _service
23from grpc_testing._server import _servicer_context
24
25
26def _implementation(descriptors_to_servicers, method_descriptor):
27    servicer = descriptors_to_servicers[method_descriptor.containing_service]
28    return getattr(servicer, method_descriptor.name)
29
30
31def _unary_unary_service(request):
32    def service(implementation, rpc, servicer_context):
33        _service.unary_unary(implementation, rpc, request, servicer_context)
34
35    return service
36
37
38def _unary_stream_service(request):
39    def service(implementation, rpc, servicer_context):
40        _service.unary_stream(implementation, rpc, request, servicer_context)
41
42    return service
43
44
45def _stream_unary_service(handler):
46    def service(implementation, rpc, servicer_context):
47        _service.stream_unary(implementation, rpc, handler, servicer_context)
48
49    return service
50
51
52def _stream_stream_service(handler):
53    def service(implementation, rpc, servicer_context):
54        _service.stream_stream(implementation, rpc, handler, servicer_context)
55
56    return service
57
58
59class _Serverish(_common.Serverish):
60    def __init__(self, descriptors_to_servicers, time):
61        self._descriptors_to_servicers = descriptors_to_servicers
62        self._time = time
63
64    def _invoke(
65        self,
66        service_behavior,
67        method_descriptor,
68        handler,
69        invocation_metadata,
70        deadline,
71    ):
72        implementation = _implementation(
73            self._descriptors_to_servicers, method_descriptor
74        )
75        rpc = _rpc.Rpc(handler, invocation_metadata)
76        if handler.add_termination_callback(rpc.extrinsic_abort):
77            servicer_context = _servicer_context.ServicerContext(
78                rpc, self._time, deadline
79            )
80            service_thread = threading.Thread(
81                target=service_behavior,
82                args=(
83                    implementation,
84                    rpc,
85                    servicer_context,
86                ),
87            )
88            service_thread.start()
89
90    def invoke_unary_unary(
91        self, method_descriptor, handler, invocation_metadata, request, deadline
92    ):
93        self._invoke(
94            _unary_unary_service(request),
95            method_descriptor,
96            handler,
97            invocation_metadata,
98            deadline,
99        )
100
101    def invoke_unary_stream(
102        self, method_descriptor, handler, invocation_metadata, request, deadline
103    ):
104        self._invoke(
105            _unary_stream_service(request),
106            method_descriptor,
107            handler,
108            invocation_metadata,
109            deadline,
110        )
111
112    def invoke_stream_unary(
113        self, method_descriptor, handler, invocation_metadata, deadline
114    ):
115        self._invoke(
116            _stream_unary_service(handler),
117            method_descriptor,
118            handler,
119            invocation_metadata,
120            deadline,
121        )
122
123    def invoke_stream_stream(
124        self, method_descriptor, handler, invocation_metadata, deadline
125    ):
126        self._invoke(
127            _stream_stream_service(handler),
128            method_descriptor,
129            handler,
130            invocation_metadata,
131            deadline,
132        )
133
134
135def _deadline_and_handler(requests_closed, time, timeout):
136    if timeout is None:
137        return None, _handler.handler_without_deadline(requests_closed)
138    else:
139        deadline = time.time() + timeout
140        handler = _handler.handler_with_deadline(
141            requests_closed, time, deadline
142        )
143        return deadline, handler
144
145
146class _Server(grpc_testing.Server):
147    def __init__(self, serverish, time):
148        self._serverish = serverish
149        self._time = time
150
151    def invoke_unary_unary(
152        self, method_descriptor, invocation_metadata, request, timeout
153    ):
154        deadline, handler = _deadline_and_handler(True, self._time, timeout)
155        self._serverish.invoke_unary_unary(
156            method_descriptor, handler, invocation_metadata, request, deadline
157        )
158        return _server_rpc.UnaryUnaryServerRpc(handler)
159
160    def invoke_unary_stream(
161        self, method_descriptor, invocation_metadata, request, timeout
162    ):
163        deadline, handler = _deadline_and_handler(True, self._time, timeout)
164        self._serverish.invoke_unary_stream(
165            method_descriptor, handler, invocation_metadata, request, deadline
166        )
167        return _server_rpc.UnaryStreamServerRpc(handler)
168
169    def invoke_stream_unary(
170        self, method_descriptor, invocation_metadata, timeout
171    ):
172        deadline, handler = _deadline_and_handler(False, self._time, timeout)
173        self._serverish.invoke_stream_unary(
174            method_descriptor, handler, invocation_metadata, deadline
175        )
176        return _server_rpc.StreamUnaryServerRpc(handler)
177
178    def invoke_stream_stream(
179        self, method_descriptor, invocation_metadata, timeout
180    ):
181        deadline, handler = _deadline_and_handler(False, self._time, timeout)
182        self._serverish.invoke_stream_stream(
183            method_descriptor, handler, invocation_metadata, deadline
184        )
185        return _server_rpc.StreamStreamServerRpc(handler)
186
187
188def server_from_descriptor_to_servicers(descriptors_to_servicers, time):
189    return _Server(_Serverish(descriptors_to_servicers, time), time)
190