xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests clean shutdown of server on various interpreter exit conditions.
15
16The tests in this module spawn a subprocess for each test case, the
17test is considered successful if it doesn't freeze/timeout.
18"""
19
20import atexit
21import logging
22import os
23import subprocess
24import sys
25import threading
26import unittest
27
28from tests.unit import _server_shutdown_scenarios
29
30SCENARIO_FILE = os.path.abspath(
31    os.path.join(
32        os.path.dirname(os.path.realpath(__file__)),
33        "_server_shutdown_scenarios.py",
34    )
35)
36INTERPRETER = sys.executable
37BASE_COMMAND = [INTERPRETER, SCENARIO_FILE]
38
39processes = []
40process_lock = threading.Lock()
41
42
43# Make sure we attempt to clean up any
44# processes we may have left running
45def cleanup_processes():
46    with process_lock:
47        for process in processes:
48            try:
49                process.kill()
50            except Exception:  # pylint: disable=broad-except
51                pass
52
53
54atexit.register(cleanup_processes)
55
56
57def wait(process):
58    with process_lock:
59        processes.append(process)
60    process.wait()
61
62
63class ServerShutdown(unittest.TestCase):
64    # Currently we shut down a server (if possible) after the Python server
65    # instance is garbage collected. This behavior may change in the future.
66    def test_deallocated_server_stops(self):
67        process = subprocess.Popen(
68            BASE_COMMAND + [_server_shutdown_scenarios.SERVER_DEALLOCATED],
69            stdout=sys.stdout,
70            stderr=sys.stderr,
71        )
72        wait(process)
73
74    def test_server_exception_exits(self):
75        process = subprocess.Popen(
76            BASE_COMMAND + [_server_shutdown_scenarios.SERVER_RAISES_EXCEPTION],
77            stdout=sys.stdout,
78            stderr=sys.stderr,
79        )
80        wait(process)
81
82    @unittest.skipIf(os.name == "nt", "fork not supported on windows")
83    def test_server_fork_can_exit(self):
84        process = subprocess.Popen(
85            BASE_COMMAND + [_server_shutdown_scenarios.SERVER_FORK_CAN_EXIT],
86            stdout=sys.stdout,
87            stderr=sys.stderr,
88        )
89        wait(process)
90
91
92if __name__ == "__main__":
93    logging.basicConfig()
94    unittest.main(verbosity=2)
95