1# Copyright 2019 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""The Python implementation of the TestServicer.""" 15 16import time 17 18import grpc 19 20from src.proto.grpc.testing import empty_pb2 21from src.proto.grpc.testing import messages_pb2 22from src.proto.grpc.testing import test_pb2_grpc 23 24_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial" 25_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin" 26_US_IN_A_SECOND = 1000 * 1000 27 28 29def _maybe_echo_metadata(servicer_context): 30 """Copies metadata from request to response if it is present.""" 31 invocation_metadata = dict(servicer_context.invocation_metadata()) 32 if _INITIAL_METADATA_KEY in invocation_metadata: 33 initial_metadatum = ( 34 _INITIAL_METADATA_KEY, 35 invocation_metadata[_INITIAL_METADATA_KEY], 36 ) 37 servicer_context.send_initial_metadata((initial_metadatum,)) 38 if _TRAILING_METADATA_KEY in invocation_metadata: 39 trailing_metadatum = ( 40 _TRAILING_METADATA_KEY, 41 invocation_metadata[_TRAILING_METADATA_KEY], 42 ) 43 servicer_context.set_trailing_metadata((trailing_metadatum,)) 44 45 46def _maybe_echo_status_and_message(request, servicer_context): 47 """Sets the response context code and details if the request asks for them""" 48 if request.HasField("response_status"): 49 servicer_context.set_code(request.response_status.code) 50 servicer_context.set_details(request.response_status.message) 51 52 53class TestService(test_pb2_grpc.TestServiceServicer): 54 def EmptyCall(self, request, context): 55 _maybe_echo_metadata(context) 56 return empty_pb2.Empty() 57 58 def UnaryCall(self, request, context): 59 _maybe_echo_metadata(context) 60 _maybe_echo_status_and_message(request, context) 61 return messages_pb2.SimpleResponse( 62 payload=messages_pb2.Payload( 63 type=messages_pb2.COMPRESSABLE, 64 body=b"\x00" * request.response_size, 65 ) 66 ) 67 68 def StreamingOutputCall(self, request, context): 69 _maybe_echo_status_and_message(request, context) 70 for response_parameters in request.response_parameters: 71 if response_parameters.interval_us != 0: 72 time.sleep(response_parameters.interval_us / _US_IN_A_SECOND) 73 yield messages_pb2.StreamingOutputCallResponse( 74 payload=messages_pb2.Payload( 75 type=request.response_type, 76 body=b"\x00" * response_parameters.size, 77 ) 78 ) 79 80 def StreamingInputCall(self, request_iterator, context): 81 aggregate_size = 0 82 for request in request_iterator: 83 if request.payload is not None and request.payload.body: 84 aggregate_size += len(request.payload.body) 85 return messages_pb2.StreamingInputCallResponse( 86 aggregated_payload_size=aggregate_size 87 ) 88 89 def FullDuplexCall(self, request_iterator, context): 90 _maybe_echo_metadata(context) 91 for request in request_iterator: 92 _maybe_echo_status_and_message(request, context) 93 for response_parameters in request.response_parameters: 94 if response_parameters.interval_us != 0: 95 time.sleep( 96 response_parameters.interval_us / _US_IN_A_SECOND 97 ) 98 yield messages_pb2.StreamingOutputCallResponse( 99 payload=messages_pb2.Payload( 100 type=request.payload.type, 101 body=b"\x00" * response_parameters.size, 102 ) 103 ) 104 105 # NOTE(nathaniel): Apparently this is the same as the full-duplex call? 106 # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)... 107 def HalfDuplexCall(self, request_iterator, context): 108 return self.FullDuplexCall(request_iterator, context) 109