xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Defines a number of module-scope gRPC scenarios to test server shutdown."""
15
16import argparse
17from concurrent import futures
18import logging
19import os
20import queue
21import threading
22import time
23
24import grpc
25
26from tests.unit import test_common
27
28WAIT_TIME = 1000
29
30REQUEST = b"request"
31RESPONSE = b"response"
32
33SERVER_RAISES_EXCEPTION = "server_raises_exception"
34SERVER_DEALLOCATED = "server_deallocated"
35SERVER_FORK_CAN_EXIT = "server_fork_can_exit"
36
37FORK_EXIT = "/test/ForkExit"
38
39
40def fork_and_exit(request, servicer_context):
41    pid = os.fork()
42    if pid == 0:
43        os._exit(0)
44    return RESPONSE
45
46
47class GenericHandler(grpc.GenericRpcHandler):
48    def service(self, handler_call_details):
49        if handler_call_details.method == FORK_EXIT:
50            return grpc.unary_unary_rpc_method_handler(fork_and_exit)
51        else:
52            return None
53
54
55def run_server(port_queue):
56    server = test_common.test_server()
57    port = server.add_insecure_port("[::]:0")
58    port_queue.put(port)
59    server.add_generic_rpc_handlers((GenericHandler(),))
60    server.start()
61    # threading.Event.wait() does not exhibit the bug identified in
62    # https://github.com/grpc/grpc/issues/17093, sleep instead
63    time.sleep(WAIT_TIME)
64
65
66def run_test(args):
67    if args.scenario == SERVER_RAISES_EXCEPTION:
68        server = test_common.test_server()
69        server.start()
70        raise Exception()
71    elif args.scenario == SERVER_DEALLOCATED:
72        server = test_common.test_server()
73        server.start()
74        server.__del__()
75        while server._state.stage != grpc._server._ServerStage.STOPPED:
76            pass
77    elif args.scenario == SERVER_FORK_CAN_EXIT:
78        port_queue = queue.Queue()
79        thread = threading.Thread(target=run_server, args=(port_queue,))
80        thread.daemon = True
81        thread.start()
82        port = port_queue.get()
83        channel = grpc.insecure_channel("localhost:%d" % port)
84        multi_callable = channel.unary_unary(
85            FORK_EXIT,
86            _registered_method=True,
87        )
88        result, call = multi_callable.with_call(REQUEST, wait_for_ready=True)
89        os.wait()
90    else:
91        raise ValueError("unknown test scenario")
92
93
94if __name__ == "__main__":
95    logging.basicConfig()
96    parser = argparse.ArgumentParser()
97    parser.add_argument("scenario", type=str)
98    args = parser.parse_args()
99    run_test(args)
100