1# Copyright 2019 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from __future__ import division 16 17from concurrent import futures 18import datetime 19import threading 20import time 21import unittest 22 23import grpc 24 25from tests.unit.framework.common import test_constants 26 27_WAIT_FOR_BLOCKING = datetime.timedelta(seconds=1) 28 29 30def _block_on_waiting(server, termination_event, timeout=None): 31 server.start() 32 server.wait_for_termination(timeout=timeout) 33 termination_event.set() 34 35 36class ServerWaitForTerminationTest(unittest.TestCase): 37 def test_unblock_by_invoking_stop(self): 38 termination_event = threading.Event() 39 server = grpc.server(futures.ThreadPoolExecutor()) 40 41 wait_thread = threading.Thread( 42 target=_block_on_waiting, 43 args=( 44 server, 45 termination_event, 46 ), 47 ) 48 wait_thread.daemon = True 49 wait_thread.start() 50 time.sleep(_WAIT_FOR_BLOCKING.total_seconds()) 51 52 server.stop(None) 53 termination_event.wait(timeout=test_constants.SHORT_TIMEOUT) 54 self.assertTrue(termination_event.is_set()) 55 56 def test_unblock_by_del(self): 57 termination_event = threading.Event() 58 server = grpc.server(futures.ThreadPoolExecutor()) 59 60 wait_thread = threading.Thread( 61 target=_block_on_waiting, 62 args=( 63 server, 64 termination_event, 65 ), 66 ) 67 wait_thread.daemon = True 68 wait_thread.start() 69 time.sleep(_WAIT_FOR_BLOCKING.total_seconds()) 70 71 # Invoke manually here, in Python 2 it will be invoked by GC sometime. 72 server.__del__() 73 termination_event.wait(timeout=test_constants.SHORT_TIMEOUT) 74 self.assertTrue(termination_event.is_set()) 75 76 def test_unblock_by_timeout(self): 77 termination_event = threading.Event() 78 server = grpc.server(futures.ThreadPoolExecutor()) 79 80 wait_thread = threading.Thread( 81 target=_block_on_waiting, 82 args=( 83 server, 84 termination_event, 85 test_constants.SHORT_TIMEOUT / 2, 86 ), 87 ) 88 wait_thread.daemon = True 89 wait_thread.start() 90 91 termination_event.wait(timeout=test_constants.SHORT_TIMEOUT) 92 self.assertTrue(termination_event.is_set()) 93 94 95if __name__ == "__main__": 96 unittest.main(verbosity=2) 97