1# Copyright 2019 The gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import contextlib 16import datetime 17import subprocess 18import sys 19import threading 20import time 21 22import grpc 23import grpc.experimental 24 25_PORT = 5741 26_MESSAGE_SIZE = 4 27_RESPONSE_COUNT = 32 * 1024 28 29_SERVER_CODE = ( 30 """ 31import datetime 32import threading 33import grpc 34from concurrent import futures 35from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2 36from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2_grpc 37 38class Handler(unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceServicer): 39 40 def Benchmark(self, request, context): 41 payload = b'\\x00\\x01' * int(request.message_size / 2) 42 for _ in range(request.response_count): 43 yield unary_stream_benchmark_pb2.BenchmarkResponse(response=payload) 44 45 46server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) 47server.add_insecure_port('[::]:%d') 48unary_stream_benchmark_pb2_grpc.add_UnaryStreamBenchmarkServiceServicer_to_server(Handler(), server) 49server.start() 50server.wait_for_termination() 51""" 52 % _PORT 53) 54 55try: 56 from src.python.grpcio_tests.tests.stress import ( 57 unary_stream_benchmark_pb2_grpc, 58 ) 59 from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2 60 61 _GRPC_CHANNEL_OPTIONS = [ 62 ("grpc.max_metadata_size", 16 * 1024 * 1024), 63 ("grpc.max_receive_message_length", 64 * 1024 * 1024), 64 (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1), 65 ] 66 67 @contextlib.contextmanager 68 def _running_server(): 69 server_process = subprocess.Popen( 70 [sys.executable, "-c", _SERVER_CODE], 71 stdout=subprocess.PIPE, 72 stderr=subprocess.PIPE, 73 ) 74 try: 75 yield 76 finally: 77 server_process.terminate() 78 server_process.wait() 79 sys.stdout.write("stdout: {}".format(server_process.stdout.read())) 80 sys.stdout.flush() 81 sys.stdout.write("stderr: {}".format(server_process.stderr.read())) 82 sys.stdout.flush() 83 84 def profile(message_size, response_count): 85 request = unary_stream_benchmark_pb2.BenchmarkRequest( 86 message_size=message_size, response_count=response_count 87 ) 88 with grpc.insecure_channel( 89 "[::]:{}".format(_PORT), options=_GRPC_CHANNEL_OPTIONS 90 ) as channel: 91 stub = ( 92 unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceStub( 93 channel 94 ) 95 ) 96 start = datetime.datetime.now() 97 call = stub.Benchmark(request, wait_for_ready=True) 98 for message in call: 99 pass 100 end = datetime.datetime.now() 101 return end - start 102 103 def main(): 104 with _running_server(): 105 for i in range(1000): 106 latency = profile(_MESSAGE_SIZE, 1024) 107 sys.stdout.write("{}\n".format(latency.total_seconds())) 108 sys.stdout.flush() 109 110 if __name__ == "__main__": 111 main() 112 113except ImportError: 114 # NOTE(rbellevi): The test runner should not load this module. 115 pass 116