1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Defines a number of module-scope gRPC scenarios to test server shutdown.""" 15 16import argparse 17from concurrent import futures 18import logging 19import os 20import queue 21import threading 22import time 23 24import grpc 25 26from tests.unit import test_common 27 28WAIT_TIME = 1000 29 30REQUEST = b"request" 31RESPONSE = b"response" 32 33SERVER_RAISES_EXCEPTION = "server_raises_exception" 34SERVER_DEALLOCATED = "server_deallocated" 35SERVER_FORK_CAN_EXIT = "server_fork_can_exit" 36 37FORK_EXIT = "/test/ForkExit" 38 39 40def fork_and_exit(request, servicer_context): 41 pid = os.fork() 42 if pid == 0: 43 os._exit(0) 44 return RESPONSE 45 46 47class GenericHandler(grpc.GenericRpcHandler): 48 def service(self, handler_call_details): 49 if handler_call_details.method == FORK_EXIT: 50 return grpc.unary_unary_rpc_method_handler(fork_and_exit) 51 else: 52 return None 53 54 55def run_server(port_queue): 56 server = test_common.test_server() 57 port = server.add_insecure_port("[::]:0") 58 port_queue.put(port) 59 server.add_generic_rpc_handlers((GenericHandler(),)) 60 server.start() 61 # threading.Event.wait() does not exhibit the bug identified in 62 # https://github.com/grpc/grpc/issues/17093, sleep instead 63 time.sleep(WAIT_TIME) 64 65 66def run_test(args): 67 if args.scenario == SERVER_RAISES_EXCEPTION: 68 server = test_common.test_server() 69 server.start() 70 raise Exception() 71 elif args.scenario == SERVER_DEALLOCATED: 72 server = test_common.test_server() 73 server.start() 74 server.__del__() 75 while server._state.stage != grpc._server._ServerStage.STOPPED: 76 pass 77 elif args.scenario == SERVER_FORK_CAN_EXIT: 78 port_queue = queue.Queue() 79 thread = threading.Thread(target=run_server, args=(port_queue,)) 80 thread.daemon = True 81 thread.start() 82 port = port_queue.get() 83 channel = grpc.insecure_channel("localhost:%d" % port) 84 multi_callable = channel.unary_unary( 85 FORK_EXIT, 86 _registered_method=True, 87 ) 88 result, call = multi_callable.with_call(REQUEST, wait_for_ready=True) 89 os.wait() 90 else: 91 raise ValueError("unknown test scenario") 92 93 94if __name__ == "__main__": 95 logging.basicConfig() 96 parser = argparse.ArgumentParser() 97 parser.add_argument("scenario", type=str) 98 args = parser.parse_args() 99 run_test(args) 100