xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/_runner.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import collections
18import io
19import os
20import select
21import signal
22import sys
23import tempfile
24import threading
25import time
26import unittest
27import uuid
28
29from tests import _loader
30from tests import _result
31
32
33class CaptureFile(object):
34    """A context-managed file to redirect output to a byte array.
35
36    Use by invoking `start` (`__enter__`) and at some point invoking `stop`
37    (`__exit__`). At any point after the initial call to `start` call `output` to
38    get the current redirected output. Note that we don't currently use file
39    locking, so calling `output` between calls to `start` and `stop` may muddle
40    the result (you should only be doing this during a Python-handled interrupt as
41    a last ditch effort to provide output to the user).
42
43    Attributes:
44      _redirected_fd (int): File descriptor of file to redirect writes from.
45      _saved_fd (int): A copy of the original value of the redirected file
46        descriptor.
47      _into_file (TemporaryFile or None): File to which writes are redirected.
48        Only non-None when self is started.
49    """
50
51    def __init__(self, fd):
52        self._redirected_fd = fd
53        self._saved_fd = os.dup(self._redirected_fd)
54        self._into_file = None
55
56    def output(self):
57        """Get all output from the redirected-to file if it exists."""
58        if self._into_file:
59            self._into_file.seek(0)
60            return bytes(self._into_file.read())
61        else:
62            return bytes()
63
64    def start(self):
65        """Start redirection of writes to the file descriptor."""
66        self._into_file = tempfile.TemporaryFile()
67        os.dup2(self._into_file.fileno(), self._redirected_fd)
68
69    def stop(self):
70        """Stop redirection of writes to the file descriptor."""
71        # n.b. this dup2 call auto-closes self._redirected_fd
72        os.dup2(self._saved_fd, self._redirected_fd)
73
74    def write_bypass(self, value):
75        """Bypass the redirection and write directly to the original file.
76
77        Arguments:
78          value (str): What to write to the original file.
79        """
80        if not isinstance(value, bytes):
81            value = value.encode("ascii")
82        if self._saved_fd is None:
83            os.write(self._redirect_fd, value)
84        else:
85            os.write(self._saved_fd, value)
86
87    def __enter__(self):
88        self.start()
89        return self
90
91    def __exit__(self, type, value, traceback):
92        self.stop()
93
94    def close(self):
95        """Close any resources used by self not closed by stop()."""
96        os.close(self._saved_fd)
97
98
99class AugmentedCase(collections.namedtuple("AugmentedCase", ["case", "id"])):
100    """A test case with a guaranteed unique externally specified identifier.
101
102    Attributes:
103      case (unittest.TestCase): TestCase we're decorating with an additional
104        identifier.
105      id (object): Any identifier that may be considered 'unique' for testing
106        purposes.
107    """
108
109    def __new__(cls, case, id=None):
110        if id is None:
111            id = uuid.uuid4()
112        return super(cls, AugmentedCase).__new__(cls, case, id)
113
114
115# NOTE(lidiz) This complex wrapper is not triggering setUpClass nor
116# tearDownClass. Do not use those methods, or fix this wrapper!
117class Runner(object):
118    def __init__(self, dedicated_threads=False):
119        """Constructs the Runner object.
120
121        Args:
122          dedicated_threads: A bool indicates whether to spawn each unit test
123            in separate thread or not.
124        """
125        self._skipped_tests = []
126        self._dedicated_threads = dedicated_threads
127
128    def skip_tests(self, tests):
129        self._skipped_tests = tests
130
131    def run(self, suite):
132        """See setuptools' test_runner setup argument for information."""
133        # only run test cases with id starting with given prefix
134        testcase_filter = os.getenv("GRPC_PYTHON_TESTRUNNER_FILTER")
135        filtered_cases = []
136        for case in _loader.iterate_suite_cases(suite):
137            if not testcase_filter or case.id().startswith(testcase_filter):
138                filtered_cases.append(case)
139
140        # Ensure that every test case has no collision with any other test case in
141        # the augmented results.
142        augmented_cases = [
143            AugmentedCase(case, uuid.uuid4()) for case in filtered_cases
144        ]
145        case_id_by_case = dict(
146            (augmented_case.case, augmented_case.id)
147            for augmented_case in augmented_cases
148        )
149        result_out = io.StringIO()
150        result = _result.TerminalResult(
151            result_out, id_map=lambda case: case_id_by_case[case]
152        )
153        stdout_pipe = CaptureFile(sys.stdout.fileno())
154        stderr_pipe = CaptureFile(sys.stderr.fileno())
155        kill_flag = [False]
156
157        def sigint_handler(signal_number, frame):
158            if signal_number == signal.SIGINT:
159                kill_flag[0] = True  # Python 2.7 not having 'local'... :-(
160            signal.signal(signal_number, signal.SIG_DFL)
161
162        def fault_handler(signal_number, frame):
163            stdout_pipe.write_bypass(
164                "Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n".format(
165                    signal_number, stdout_pipe.output(), stderr_pipe.output()
166                )
167            )
168            os._exit(1)
169
170        def check_kill_self():
171            if kill_flag[0]:
172                stdout_pipe.write_bypass("Stopping tests short...")
173                result.stopTestRun()
174                stdout_pipe.write_bypass(result_out.getvalue())
175                stdout_pipe.write_bypass(
176                    "\ninterrupted stdout:\n{}\n".format(
177                        stdout_pipe.output().decode()
178                    )
179                )
180                stderr_pipe.write_bypass(
181                    "\ninterrupted stderr:\n{}\n".format(
182                        stderr_pipe.output().decode()
183                    )
184                )
185                os._exit(1)
186
187        def try_set_handler(name, handler):
188            try:
189                signal.signal(getattr(signal, name), handler)
190            except AttributeError:
191                pass
192
193        try_set_handler("SIGINT", sigint_handler)
194        try_set_handler("SIGBUS", fault_handler)
195        try_set_handler("SIGABRT", fault_handler)
196        try_set_handler("SIGFPE", fault_handler)
197        try_set_handler("SIGILL", fault_handler)
198        # Sometimes output will lag after a test has successfully finished; we
199        # ignore such writes to our pipes.
200        try_set_handler("SIGPIPE", signal.SIG_IGN)
201
202        # Run the tests
203        result.startTestRun()
204        for augmented_case in augmented_cases:
205            for skipped_test in self._skipped_tests:
206                if skipped_test in augmented_case.case.id():
207                    break
208            else:
209                sys.stdout.write(
210                    "Running       {}\n".format(augmented_case.case.id())
211                )
212                sys.stdout.flush()
213                if self._dedicated_threads:
214                    # (Deprecated) Spawns dedicated thread for each test case.
215                    case_thread = threading.Thread(
216                        target=augmented_case.case.run, args=(result,)
217                    )
218                    try:
219                        with stdout_pipe, stderr_pipe:
220                            case_thread.start()
221                            # If the thread is exited unexpected, stop testing.
222                            while case_thread.is_alive():
223                                check_kill_self()
224                                time.sleep(0)
225                            case_thread.join()
226                    except:  # pylint: disable=try-except-raise
227                        # re-raise the exception after forcing the with-block to end
228                        raise
229                    # Records the result of the test case run.
230                    result.set_output(
231                        augmented_case.case,
232                        stdout_pipe.output(),
233                        stderr_pipe.output(),
234                    )
235                    sys.stdout.write(result_out.getvalue())
236                    sys.stdout.flush()
237                    result_out.truncate(0)
238                    check_kill_self()
239                else:
240                    # Donates current thread to test case execution.
241                    augmented_case.case.run(result)
242        result.stopTestRun()
243        stdout_pipe.close()
244        stderr_pipe.close()
245
246        # Report results
247        sys.stdout.write(result_out.getvalue())
248        sys.stdout.flush()
249        signal.signal(signal.SIGINT, signal.SIG_DFL)
250        with open("report.xml", "wb") as report_xml_file:
251            _result.jenkins_junit_xml(result).write(report_xml_file)
252        return result
253