1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from __future__ import absolute_import 16 17import collections 18import io 19import os 20import select 21import signal 22import sys 23import tempfile 24import threading 25import time 26import unittest 27import uuid 28 29from tests import _loader 30from tests import _result 31 32 33class CaptureFile(object): 34 """A context-managed file to redirect output to a byte array. 35 36 Use by invoking `start` (`__enter__`) and at some point invoking `stop` 37 (`__exit__`). At any point after the initial call to `start` call `output` to 38 get the current redirected output. Note that we don't currently use file 39 locking, so calling `output` between calls to `start` and `stop` may muddle 40 the result (you should only be doing this during a Python-handled interrupt as 41 a last ditch effort to provide output to the user). 42 43 Attributes: 44 _redirected_fd (int): File descriptor of file to redirect writes from. 45 _saved_fd (int): A copy of the original value of the redirected file 46 descriptor. 47 _into_file (TemporaryFile or None): File to which writes are redirected. 48 Only non-None when self is started. 49 """ 50 51 def __init__(self, fd): 52 self._redirected_fd = fd 53 self._saved_fd = os.dup(self._redirected_fd) 54 self._into_file = None 55 56 def output(self): 57 """Get all output from the redirected-to file if it exists.""" 58 if self._into_file: 59 self._into_file.seek(0) 60 return bytes(self._into_file.read()) 61 else: 62 return bytes() 63 64 def start(self): 65 """Start redirection of writes to the file descriptor.""" 66 self._into_file = tempfile.TemporaryFile() 67 os.dup2(self._into_file.fileno(), self._redirected_fd) 68 69 def stop(self): 70 """Stop redirection of writes to the file descriptor.""" 71 # n.b. this dup2 call auto-closes self._redirected_fd 72 os.dup2(self._saved_fd, self._redirected_fd) 73 74 def write_bypass(self, value): 75 """Bypass the redirection and write directly to the original file. 76 77 Arguments: 78 value (str): What to write to the original file. 79 """ 80 if not isinstance(value, bytes): 81 value = value.encode("ascii") 82 if self._saved_fd is None: 83 os.write(self._redirect_fd, value) 84 else: 85 os.write(self._saved_fd, value) 86 87 def __enter__(self): 88 self.start() 89 return self 90 91 def __exit__(self, type, value, traceback): 92 self.stop() 93 94 def close(self): 95 """Close any resources used by self not closed by stop().""" 96 os.close(self._saved_fd) 97 98 99class AugmentedCase(collections.namedtuple("AugmentedCase", ["case", "id"])): 100 """A test case with a guaranteed unique externally specified identifier. 101 102 Attributes: 103 case (unittest.TestCase): TestCase we're decorating with an additional 104 identifier. 105 id (object): Any identifier that may be considered 'unique' for testing 106 purposes. 107 """ 108 109 def __new__(cls, case, id=None): 110 if id is None: 111 id = uuid.uuid4() 112 return super(cls, AugmentedCase).__new__(cls, case, id) 113 114 115# NOTE(lidiz) This complex wrapper is not triggering setUpClass nor 116# tearDownClass. Do not use those methods, or fix this wrapper! 117class Runner(object): 118 def __init__(self, dedicated_threads=False): 119 """Constructs the Runner object. 120 121 Args: 122 dedicated_threads: A bool indicates whether to spawn each unit test 123 in separate thread or not. 124 """ 125 self._skipped_tests = [] 126 self._dedicated_threads = dedicated_threads 127 128 def skip_tests(self, tests): 129 self._skipped_tests = tests 130 131 def run(self, suite): 132 """See setuptools' test_runner setup argument for information.""" 133 # only run test cases with id starting with given prefix 134 testcase_filter = os.getenv("GRPC_PYTHON_TESTRUNNER_FILTER") 135 filtered_cases = [] 136 for case in _loader.iterate_suite_cases(suite): 137 if not testcase_filter or case.id().startswith(testcase_filter): 138 filtered_cases.append(case) 139 140 # Ensure that every test case has no collision with any other test case in 141 # the augmented results. 142 augmented_cases = [ 143 AugmentedCase(case, uuid.uuid4()) for case in filtered_cases 144 ] 145 case_id_by_case = dict( 146 (augmented_case.case, augmented_case.id) 147 for augmented_case in augmented_cases 148 ) 149 result_out = io.StringIO() 150 result = _result.TerminalResult( 151 result_out, id_map=lambda case: case_id_by_case[case] 152 ) 153 stdout_pipe = CaptureFile(sys.stdout.fileno()) 154 stderr_pipe = CaptureFile(sys.stderr.fileno()) 155 kill_flag = [False] 156 157 def sigint_handler(signal_number, frame): 158 if signal_number == signal.SIGINT: 159 kill_flag[0] = True # Python 2.7 not having 'local'... :-( 160 signal.signal(signal_number, signal.SIG_DFL) 161 162 def fault_handler(signal_number, frame): 163 stdout_pipe.write_bypass( 164 "Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n".format( 165 signal_number, stdout_pipe.output(), stderr_pipe.output() 166 ) 167 ) 168 os._exit(1) 169 170 def check_kill_self(): 171 if kill_flag[0]: 172 stdout_pipe.write_bypass("Stopping tests short...") 173 result.stopTestRun() 174 stdout_pipe.write_bypass(result_out.getvalue()) 175 stdout_pipe.write_bypass( 176 "\ninterrupted stdout:\n{}\n".format( 177 stdout_pipe.output().decode() 178 ) 179 ) 180 stderr_pipe.write_bypass( 181 "\ninterrupted stderr:\n{}\n".format( 182 stderr_pipe.output().decode() 183 ) 184 ) 185 os._exit(1) 186 187 def try_set_handler(name, handler): 188 try: 189 signal.signal(getattr(signal, name), handler) 190 except AttributeError: 191 pass 192 193 try_set_handler("SIGINT", sigint_handler) 194 try_set_handler("SIGBUS", fault_handler) 195 try_set_handler("SIGABRT", fault_handler) 196 try_set_handler("SIGFPE", fault_handler) 197 try_set_handler("SIGILL", fault_handler) 198 # Sometimes output will lag after a test has successfully finished; we 199 # ignore such writes to our pipes. 200 try_set_handler("SIGPIPE", signal.SIG_IGN) 201 202 # Run the tests 203 result.startTestRun() 204 for augmented_case in augmented_cases: 205 for skipped_test in self._skipped_tests: 206 if skipped_test in augmented_case.case.id(): 207 break 208 else: 209 sys.stdout.write( 210 "Running {}\n".format(augmented_case.case.id()) 211 ) 212 sys.stdout.flush() 213 if self._dedicated_threads: 214 # (Deprecated) Spawns dedicated thread for each test case. 215 case_thread = threading.Thread( 216 target=augmented_case.case.run, args=(result,) 217 ) 218 try: 219 with stdout_pipe, stderr_pipe: 220 case_thread.start() 221 # If the thread is exited unexpected, stop testing. 222 while case_thread.is_alive(): 223 check_kill_self() 224 time.sleep(0) 225 case_thread.join() 226 except: # pylint: disable=try-except-raise 227 # re-raise the exception after forcing the with-block to end 228 raise 229 # Records the result of the test case run. 230 result.set_output( 231 augmented_case.case, 232 stdout_pipe.output(), 233 stderr_pipe.output(), 234 ) 235 sys.stdout.write(result_out.getvalue()) 236 sys.stdout.flush() 237 result_out.truncate(0) 238 check_kill_self() 239 else: 240 # Donates current thread to test case execution. 241 augmented_case.case.run(result) 242 result.stopTestRun() 243 stdout_pipe.close() 244 stderr_pipe.close() 245 246 # Report results 247 sys.stdout.write(result_out.getvalue()) 248 sys.stdout.flush() 249 signal.signal(signal.SIGINT, signal.SIG_DFL) 250 with open("report.xml", "wb") as report_xml_file: 251 _result.jenkins_junit_xml(result).write(report_xml_file) 252 return result 253