1# Copyright 2019 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import division
16
17from concurrent import futures
18import datetime
19import threading
20import time
21import unittest
22
23import grpc
24
25from tests.unit.framework.common import test_constants
26
27_WAIT_FOR_BLOCKING = datetime.timedelta(seconds=1)
28
29
30def _block_on_waiting(server, termination_event, timeout=None):
31    server.start()
32    server.wait_for_termination(timeout=timeout)
33    termination_event.set()
34
35
36class ServerWaitForTerminationTest(unittest.TestCase):
37    def test_unblock_by_invoking_stop(self):
38        termination_event = threading.Event()
39        server = grpc.server(futures.ThreadPoolExecutor())
40
41        wait_thread = threading.Thread(
42            target=_block_on_waiting,
43            args=(
44                server,
45                termination_event,
46            ),
47        )
48        wait_thread.daemon = True
49        wait_thread.start()
50        time.sleep(_WAIT_FOR_BLOCKING.total_seconds())
51
52        server.stop(None)
53        termination_event.wait(timeout=test_constants.SHORT_TIMEOUT)
54        self.assertTrue(termination_event.is_set())
55
56    def test_unblock_by_del(self):
57        termination_event = threading.Event()
58        server = grpc.server(futures.ThreadPoolExecutor())
59
60        wait_thread = threading.Thread(
61            target=_block_on_waiting,
62            args=(
63                server,
64                termination_event,
65            ),
66        )
67        wait_thread.daemon = True
68        wait_thread.start()
69        time.sleep(_WAIT_FOR_BLOCKING.total_seconds())
70
71        # Invoke manually here, in Python 2 it will be invoked by GC sometime.
72        server.__del__()
73        termination_event.wait(timeout=test_constants.SHORT_TIMEOUT)
74        self.assertTrue(termination_event.is_set())
75
76    def test_unblock_by_timeout(self):
77        termination_event = threading.Event()
78        server = grpc.server(futures.ThreadPoolExecutor())
79
80        wait_thread = threading.Thread(
81            target=_block_on_waiting,
82            args=(
83                server,
84                termination_event,
85                test_constants.SHORT_TIMEOUT / 2,
86            ),
87        )
88        wait_thread.daemon = True
89        wait_thread.start()
90
91        termination_event.wait(timeout=test_constants.SHORT_TIMEOUT)
92        self.assertTrue(termination_event.is_set())
93
94
95if __name__ == "__main__":
96    unittest.main(verbosity=2)
97