xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/stress/unary_stream_benchmark.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2019 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import contextlib
16import datetime
17import subprocess
18import sys
19import threading
20import time
21
22import grpc
23import grpc.experimental
24
25_PORT = 5741
26_MESSAGE_SIZE = 4
27_RESPONSE_COUNT = 32 * 1024
28
29_SERVER_CODE = (
30    """
31import datetime
32import threading
33import grpc
34from concurrent import futures
35from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2
36from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2_grpc
37
38class Handler(unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceServicer):
39
40  def Benchmark(self, request, context):
41    payload = b'\\x00\\x01' * int(request.message_size / 2)
42    for _ in range(request.response_count):
43      yield unary_stream_benchmark_pb2.BenchmarkResponse(response=payload)
44
45
46server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
47server.add_insecure_port('[::]:%d')
48unary_stream_benchmark_pb2_grpc.add_UnaryStreamBenchmarkServiceServicer_to_server(Handler(), server)
49server.start()
50server.wait_for_termination()
51"""
52    % _PORT
53)
54
55try:
56    from src.python.grpcio_tests.tests.stress import (
57        unary_stream_benchmark_pb2_grpc,
58    )
59    from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2
60
61    _GRPC_CHANNEL_OPTIONS = [
62        ("grpc.max_metadata_size", 16 * 1024 * 1024),
63        ("grpc.max_receive_message_length", 64 * 1024 * 1024),
64        (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1),
65    ]
66
67    @contextlib.contextmanager
68    def _running_server():
69        server_process = subprocess.Popen(
70            [sys.executable, "-c", _SERVER_CODE],
71            stdout=subprocess.PIPE,
72            stderr=subprocess.PIPE,
73        )
74        try:
75            yield
76        finally:
77            server_process.terminate()
78            server_process.wait()
79            sys.stdout.write("stdout: {}".format(server_process.stdout.read()))
80            sys.stdout.flush()
81            sys.stdout.write("stderr: {}".format(server_process.stderr.read()))
82            sys.stdout.flush()
83
84    def profile(message_size, response_count):
85        request = unary_stream_benchmark_pb2.BenchmarkRequest(
86            message_size=message_size, response_count=response_count
87        )
88        with grpc.insecure_channel(
89            "[::]:{}".format(_PORT), options=_GRPC_CHANNEL_OPTIONS
90        ) as channel:
91            stub = (
92                unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceStub(
93                    channel
94                )
95            )
96            start = datetime.datetime.now()
97            call = stub.Benchmark(request, wait_for_ready=True)
98            for message in call:
99                pass
100            end = datetime.datetime.now()
101        return end - start
102
103    def main():
104        with _running_server():
105            for i in range(1000):
106                latency = profile(_MESSAGE_SIZE, 1024)
107                sys.stdout.write("{}\n".format(latency.total_seconds()))
108                sys.stdout.flush()
109
110    if __name__ == "__main__":
111        main()
112
113except ImportError:
114    # NOTE(rbellevi): The test runner should not load this module.
115    pass
116