1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import threading 16 17import grpc_testing 18from grpc_testing import _common 19from grpc_testing._server import _handler 20from grpc_testing._server import _rpc 21from grpc_testing._server import _server_rpc 22from grpc_testing._server import _service 23from grpc_testing._server import _servicer_context 24 25 26def _implementation(descriptors_to_servicers, method_descriptor): 27 servicer = descriptors_to_servicers[method_descriptor.containing_service] 28 return getattr(servicer, method_descriptor.name) 29 30 31def _unary_unary_service(request): 32 def service(implementation, rpc, servicer_context): 33 _service.unary_unary(implementation, rpc, request, servicer_context) 34 35 return service 36 37 38def _unary_stream_service(request): 39 def service(implementation, rpc, servicer_context): 40 _service.unary_stream(implementation, rpc, request, servicer_context) 41 42 return service 43 44 45def _stream_unary_service(handler): 46 def service(implementation, rpc, servicer_context): 47 _service.stream_unary(implementation, rpc, handler, servicer_context) 48 49 return service 50 51 52def _stream_stream_service(handler): 53 def service(implementation, rpc, servicer_context): 54 _service.stream_stream(implementation, rpc, handler, servicer_context) 55 56 return service 57 58 59class _Serverish(_common.Serverish): 60 def __init__(self, descriptors_to_servicers, time): 61 self._descriptors_to_servicers = descriptors_to_servicers 62 self._time = time 63 64 def _invoke( 65 self, 66 service_behavior, 67 method_descriptor, 68 handler, 69 invocation_metadata, 70 deadline, 71 ): 72 implementation = _implementation( 73 self._descriptors_to_servicers, method_descriptor 74 ) 75 rpc = _rpc.Rpc(handler, invocation_metadata) 76 if handler.add_termination_callback(rpc.extrinsic_abort): 77 servicer_context = _servicer_context.ServicerContext( 78 rpc, self._time, deadline 79 ) 80 service_thread = threading.Thread( 81 target=service_behavior, 82 args=( 83 implementation, 84 rpc, 85 servicer_context, 86 ), 87 ) 88 service_thread.start() 89 90 def invoke_unary_unary( 91 self, method_descriptor, handler, invocation_metadata, request, deadline 92 ): 93 self._invoke( 94 _unary_unary_service(request), 95 method_descriptor, 96 handler, 97 invocation_metadata, 98 deadline, 99 ) 100 101 def invoke_unary_stream( 102 self, method_descriptor, handler, invocation_metadata, request, deadline 103 ): 104 self._invoke( 105 _unary_stream_service(request), 106 method_descriptor, 107 handler, 108 invocation_metadata, 109 deadline, 110 ) 111 112 def invoke_stream_unary( 113 self, method_descriptor, handler, invocation_metadata, deadline 114 ): 115 self._invoke( 116 _stream_unary_service(handler), 117 method_descriptor, 118 handler, 119 invocation_metadata, 120 deadline, 121 ) 122 123 def invoke_stream_stream( 124 self, method_descriptor, handler, invocation_metadata, deadline 125 ): 126 self._invoke( 127 _stream_stream_service(handler), 128 method_descriptor, 129 handler, 130 invocation_metadata, 131 deadline, 132 ) 133 134 135def _deadline_and_handler(requests_closed, time, timeout): 136 if timeout is None: 137 return None, _handler.handler_without_deadline(requests_closed) 138 else: 139 deadline = time.time() + timeout 140 handler = _handler.handler_with_deadline( 141 requests_closed, time, deadline 142 ) 143 return deadline, handler 144 145 146class _Server(grpc_testing.Server): 147 def __init__(self, serverish, time): 148 self._serverish = serverish 149 self._time = time 150 151 def invoke_unary_unary( 152 self, method_descriptor, invocation_metadata, request, timeout 153 ): 154 deadline, handler = _deadline_and_handler(True, self._time, timeout) 155 self._serverish.invoke_unary_unary( 156 method_descriptor, handler, invocation_metadata, request, deadline 157 ) 158 return _server_rpc.UnaryUnaryServerRpc(handler) 159 160 def invoke_unary_stream( 161 self, method_descriptor, invocation_metadata, request, timeout 162 ): 163 deadline, handler = _deadline_and_handler(True, self._time, timeout) 164 self._serverish.invoke_unary_stream( 165 method_descriptor, handler, invocation_metadata, request, deadline 166 ) 167 return _server_rpc.UnaryStreamServerRpc(handler) 168 169 def invoke_stream_unary( 170 self, method_descriptor, invocation_metadata, timeout 171 ): 172 deadline, handler = _deadline_and_handler(False, self._time, timeout) 173 self._serverish.invoke_stream_unary( 174 method_descriptor, handler, invocation_metadata, deadline 175 ) 176 return _server_rpc.StreamUnaryServerRpc(handler) 177 178 def invoke_stream_stream( 179 self, method_descriptor, invocation_metadata, timeout 180 ): 181 deadline, handler = _deadline_and_handler(False, self._time, timeout) 182 self._serverish.invoke_stream_stream( 183 method_descriptor, handler, invocation_metadata, deadline 184 ) 185 return _server_rpc.StreamStreamServerRpc(handler) 186 187 188def server_from_descriptor_to_servicers(descriptors_to_servicers, time): 189 return _Server(_Serverish(descriptors_to_servicers, time), time) 190