xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/_result.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import collections
18import io
19import itertools
20import traceback
21import unittest
22from xml.etree import ElementTree
23
24import coverage
25
26from tests import _loader
27
28
29class CaseResult(
30    collections.namedtuple(
31        "CaseResult",
32        ["id", "name", "kind", "stdout", "stderr", "skip_reason", "traceback"],
33    )
34):
35    """A serializable result of a single test case.
36
37    Attributes:
38      id (object): Any serializable object used to denote the identity of this
39        test case.
40      name (str or None): A human-readable name of the test case.
41      kind (CaseResult.Kind): The kind of test result.
42      stdout (object or None): Output on stdout, or None if nothing was captured.
43      stderr (object or None): Output on stderr, or None if nothing was captured.
44      skip_reason (object or None): The reason the test was skipped. Must be
45        something if self.kind is CaseResult.Kind.SKIP, else None.
46      traceback (object or None): The traceback of the test. Must be something if
47        self.kind is CaseResult.Kind.{ERROR, FAILURE, EXPECTED_FAILURE}, else
48        None.
49    """
50
51    class Kind(object):
52        UNTESTED = "untested"
53        RUNNING = "running"
54        ERROR = "error"
55        FAILURE = "failure"
56        SUCCESS = "success"
57        SKIP = "skip"
58        EXPECTED_FAILURE = "expected failure"
59        UNEXPECTED_SUCCESS = "unexpected success"
60
61    def __new__(
62        cls,
63        id=None,
64        name=None,
65        kind=None,
66        stdout=None,
67        stderr=None,
68        skip_reason=None,
69        traceback=None,
70    ):
71        """Helper keyword constructor for the namedtuple.
72
73        See this class' attributes for information on the arguments."""
74        assert id is not None
75        assert name is None or isinstance(name, str)
76        if kind is CaseResult.Kind.UNTESTED:
77            pass
78        elif kind is CaseResult.Kind.RUNNING:
79            pass
80        elif kind is CaseResult.Kind.ERROR:
81            assert traceback is not None
82        elif kind is CaseResult.Kind.FAILURE:
83            assert traceback is not None
84        elif kind is CaseResult.Kind.SUCCESS:
85            pass
86        elif kind is CaseResult.Kind.SKIP:
87            assert skip_reason is not None
88        elif kind is CaseResult.Kind.EXPECTED_FAILURE:
89            assert traceback is not None
90        elif kind is CaseResult.Kind.UNEXPECTED_SUCCESS:
91            pass
92        else:
93            assert False
94        return super(cls, CaseResult).__new__(
95            cls, id, name, kind, stdout, stderr, skip_reason, traceback
96        )
97
98    def updated(
99        self,
100        name=None,
101        kind=None,
102        stdout=None,
103        stderr=None,
104        skip_reason=None,
105        traceback=None,
106    ):
107        """Get a new validated CaseResult with the fields updated.
108
109        See this class' attributes for information on the arguments."""
110        name = self.name if name is None else name
111        kind = self.kind if kind is None else kind
112        stdout = self.stdout if stdout is None else stdout
113        stderr = self.stderr if stderr is None else stderr
114        skip_reason = self.skip_reason if skip_reason is None else skip_reason
115        traceback = self.traceback if traceback is None else traceback
116        return CaseResult(
117            id=self.id,
118            name=name,
119            kind=kind,
120            stdout=stdout,
121            stderr=stderr,
122            skip_reason=skip_reason,
123            traceback=traceback,
124        )
125
126
127class AugmentedResult(unittest.TestResult):
128    """unittest.Result that keeps track of additional information.
129
130    Uses CaseResult objects to store test-case results, providing additional
131    information beyond that of the standard Python unittest library, such as
132    standard output.
133
134    Attributes:
135      id_map (callable): A unary callable mapping unittest.TestCase objects to
136        unique identifiers.
137      cases (dict): A dictionary mapping from the identifiers returned by id_map
138        to CaseResult objects corresponding to those IDs.
139    """
140
141    def __init__(self, id_map):
142        """Initialize the object with an identifier mapping.
143
144        Arguments:
145          id_map (callable): Corresponds to the attribute `id_map`."""
146        super(AugmentedResult, self).__init__()
147        self.id_map = id_map
148        self.cases = None
149
150    def startTestRun(self):
151        """See unittest.TestResult.startTestRun."""
152        super(AugmentedResult, self).startTestRun()
153        self.cases = dict()
154
155    def startTest(self, test):
156        """See unittest.TestResult.startTest."""
157        super(AugmentedResult, self).startTest(test)
158        case_id = self.id_map(test)
159        self.cases[case_id] = CaseResult(
160            id=case_id, name=test.id(), kind=CaseResult.Kind.RUNNING
161        )
162
163    def addError(self, test, err):
164        """See unittest.TestResult.addError."""
165        super(AugmentedResult, self).addError(test, err)
166        case_id = self.id_map(test)
167        self.cases[case_id] = self.cases[case_id].updated(
168            kind=CaseResult.Kind.ERROR, traceback=err
169        )
170
171    def addFailure(self, test, err):
172        """See unittest.TestResult.addFailure."""
173        super(AugmentedResult, self).addFailure(test, err)
174        case_id = self.id_map(test)
175        self.cases[case_id] = self.cases[case_id].updated(
176            kind=CaseResult.Kind.FAILURE, traceback=err
177        )
178
179    def addSuccess(self, test):
180        """See unittest.TestResult.addSuccess."""
181        super(AugmentedResult, self).addSuccess(test)
182        case_id = self.id_map(test)
183        self.cases[case_id] = self.cases[case_id].updated(
184            kind=CaseResult.Kind.SUCCESS
185        )
186
187    def addSkip(self, test, reason):
188        """See unittest.TestResult.addSkip."""
189        super(AugmentedResult, self).addSkip(test, reason)
190        case_id = self.id_map(test)
191        self.cases[case_id] = self.cases[case_id].updated(
192            kind=CaseResult.Kind.SKIP, skip_reason=reason
193        )
194
195    def addExpectedFailure(self, test, err):
196        """See unittest.TestResult.addExpectedFailure."""
197        super(AugmentedResult, self).addExpectedFailure(test, err)
198        case_id = self.id_map(test)
199        self.cases[case_id] = self.cases[case_id].updated(
200            kind=CaseResult.Kind.EXPECTED_FAILURE, traceback=err
201        )
202
203    def addUnexpectedSuccess(self, test):
204        """See unittest.TestResult.addUnexpectedSuccess."""
205        super(AugmentedResult, self).addUnexpectedSuccess(test)
206        case_id = self.id_map(test)
207        self.cases[case_id] = self.cases[case_id].updated(
208            kind=CaseResult.Kind.UNEXPECTED_SUCCESS
209        )
210
211    def set_output(self, test, stdout, stderr):
212        """Set the output attributes for the CaseResult corresponding to a test.
213
214        Args:
215          test (unittest.TestCase): The TestCase to set the outputs of.
216          stdout (str): Output from stdout to assign to self.id_map(test).
217          stderr (str): Output from stderr to assign to self.id_map(test).
218        """
219        case_id = self.id_map(test)
220        self.cases[case_id] = self.cases[case_id].updated(
221            stdout=stdout.decode(), stderr=stderr.decode()
222        )
223
224    def augmented_results(self, filter):
225        """Convenience method to retrieve filtered case results.
226
227        Args:
228          filter (callable): A unary predicate to filter over CaseResult objects.
229        """
230        return (
231            self.cases[case_id]
232            for case_id in self.cases
233            if filter(self.cases[case_id])
234        )
235
236
237class CoverageResult(AugmentedResult):
238    """Extension to AugmentedResult adding coverage.py support per test.\
239
240  Attributes:
241    coverage_context (coverage.Coverage): coverage.py management object.
242  """
243
244    def __init__(self, id_map):
245        """See AugmentedResult.__init__."""
246        super(CoverageResult, self).__init__(id_map=id_map)
247        self.coverage_context = None
248
249    def startTest(self, test):
250        """See unittest.TestResult.startTest.
251
252        Additionally initializes and begins code coverage tracking."""
253        super(CoverageResult, self).startTest(test)
254        self.coverage_context = coverage.Coverage(data_suffix=True)
255        self.coverage_context.start()
256
257    def stopTest(self, test):
258        """See unittest.TestResult.stopTest.
259
260        Additionally stops and deinitializes code coverage tracking."""
261        super(CoverageResult, self).stopTest(test)
262        self.coverage_context.stop()
263        self.coverage_context.save()
264        self.coverage_context = None
265
266
267class _Colors(object):
268    """Namespaced constants for terminal color magic numbers."""
269
270    HEADER = "\033[95m"
271    INFO = "\033[94m"
272    OK = "\033[92m"
273    WARN = "\033[93m"
274    FAIL = "\033[91m"
275    BOLD = "\033[1m"
276    UNDERLINE = "\033[4m"
277    END = "\033[0m"
278
279
280class TerminalResult(CoverageResult):
281    """Extension to CoverageResult adding basic terminal reporting."""
282
283    def __init__(self, out, id_map):
284        """Initialize the result object.
285
286        Args:
287          out (file-like): Output file to which terminal-colored live results will
288            be written.
289          id_map (callable): See AugmentedResult.__init__.
290        """
291        super(TerminalResult, self).__init__(id_map=id_map)
292        self.out = out
293
294    def startTestRun(self):
295        """See unittest.TestResult.startTestRun."""
296        super(TerminalResult, self).startTestRun()
297        self.out.write(
298            _Colors.HEADER + "Testing gRPC Python...\n" + _Colors.END
299        )
300
301    def stopTestRun(self):
302        """See unittest.TestResult.stopTestRun."""
303        super(TerminalResult, self).stopTestRun()
304        self.out.write(summary(self))
305        self.out.flush()
306
307    def addError(self, test, err):
308        """See unittest.TestResult.addError."""
309        super(TerminalResult, self).addError(test, err)
310        self.out.write(
311            _Colors.FAIL + "ERROR         {}\n".format(test.id()) + _Colors.END
312        )
313        self.out.flush()
314
315    def addFailure(self, test, err):
316        """See unittest.TestResult.addFailure."""
317        super(TerminalResult, self).addFailure(test, err)
318        self.out.write(
319            _Colors.FAIL + "FAILURE       {}\n".format(test.id()) + _Colors.END
320        )
321        self.out.flush()
322
323    def addSuccess(self, test):
324        """See unittest.TestResult.addSuccess."""
325        super(TerminalResult, self).addSuccess(test)
326        self.out.write(
327            _Colors.OK + "SUCCESS       {}\n".format(test.id()) + _Colors.END
328        )
329        self.out.flush()
330
331    def addSkip(self, test, reason):
332        """See unittest.TestResult.addSkip."""
333        super(TerminalResult, self).addSkip(test, reason)
334        self.out.write(
335            _Colors.INFO + "SKIP          {}\n".format(test.id()) + _Colors.END
336        )
337        self.out.flush()
338
339    def addExpectedFailure(self, test, err):
340        """See unittest.TestResult.addExpectedFailure."""
341        super(TerminalResult, self).addExpectedFailure(test, err)
342        self.out.write(
343            _Colors.INFO + "FAILURE_OK    {}\n".format(test.id()) + _Colors.END
344        )
345        self.out.flush()
346
347    def addUnexpectedSuccess(self, test):
348        """See unittest.TestResult.addUnexpectedSuccess."""
349        super(TerminalResult, self).addUnexpectedSuccess(test)
350        self.out.write(
351            _Colors.INFO + "UNEXPECTED_OK {}\n".format(test.id()) + _Colors.END
352        )
353        self.out.flush()
354
355
356def _traceback_string(type, value, trace):
357    """Generate a descriptive string of a Python exception traceback.
358
359    Args:
360      type (class): The type of the exception.
361      value (Exception): The value of the exception.
362      trace (traceback): Traceback of the exception.
363
364    Returns:
365      str: Formatted exception descriptive string.
366    """
367    buffer = io.StringIO()
368    traceback.print_exception(type, value, trace, file=buffer)
369    return buffer.getvalue()
370
371
372def summary(result):
373    """A summary string of a result object.
374
375    Args:
376      result (AugmentedResult): The result object to get the summary of.
377
378    Returns:
379      str: The summary string.
380    """
381    assert isinstance(result, AugmentedResult)
382    untested = list(
383        result.augmented_results(
384            lambda case_result: case_result.kind is CaseResult.Kind.UNTESTED
385        )
386    )
387    running = list(
388        result.augmented_results(
389            lambda case_result: case_result.kind is CaseResult.Kind.RUNNING
390        )
391    )
392    failures = list(
393        result.augmented_results(
394            lambda case_result: case_result.kind is CaseResult.Kind.FAILURE
395        )
396    )
397    errors = list(
398        result.augmented_results(
399            lambda case_result: case_result.kind is CaseResult.Kind.ERROR
400        )
401    )
402    successes = list(
403        result.augmented_results(
404            lambda case_result: case_result.kind is CaseResult.Kind.SUCCESS
405        )
406    )
407    skips = list(
408        result.augmented_results(
409            lambda case_result: case_result.kind is CaseResult.Kind.SKIP
410        )
411    )
412    expected_failures = list(
413        result.augmented_results(
414            lambda case_result: case_result.kind
415            is CaseResult.Kind.EXPECTED_FAILURE
416        )
417    )
418    unexpected_successes = list(
419        result.augmented_results(
420            lambda case_result: case_result.kind
421            is CaseResult.Kind.UNEXPECTED_SUCCESS
422        )
423    )
424    running_names = [case.name for case in running]
425    finished_count = (
426        len(failures)
427        + len(errors)
428        + len(successes)
429        + len(expected_failures)
430        + len(unexpected_successes)
431    )
432    statistics = (
433        "{finished} tests finished:\n"
434        "\t{successful} successful\n"
435        "\t{unsuccessful} unsuccessful\n"
436        "\t{skipped} skipped\n"
437        "\t{expected_fail} expected failures\n"
438        "\t{unexpected_successful} unexpected successes\n"
439        "Interrupted Tests:\n"
440        "\t{interrupted}\n".format(
441            finished=finished_count,
442            successful=len(successes),
443            unsuccessful=(len(failures) + len(errors)),
444            skipped=len(skips),
445            expected_fail=len(expected_failures),
446            unexpected_successful=len(unexpected_successes),
447            interrupted=str(running_names),
448        )
449    )
450    tracebacks = "\n\n".join(
451        [
452            (
453                _Colors.FAIL
454                + "{test_name}"
455                + _Colors.END
456                + "\n"
457                + _Colors.BOLD
458                + "traceback:"
459                + _Colors.END
460                + "\n"
461                + "{traceback}\n"
462                + _Colors.BOLD
463                + "stdout:"
464                + _Colors.END
465                + "\n"
466                + "{stdout}\n"
467                + _Colors.BOLD
468                + "stderr:"
469                + _Colors.END
470                + "\n"
471                + "{stderr}\n"
472            ).format(
473                test_name=result.name,
474                traceback=_traceback_string(*result.traceback),
475                stdout=result.stdout,
476                stderr=result.stderr,
477            )
478            for result in itertools.chain(failures, errors)
479        ]
480    )
481    notes = "Unexpected successes: {}\n".format(
482        [result.name for result in unexpected_successes]
483    )
484    return statistics + "\nErrors/Failures: \n" + tracebacks + "\n" + notes
485
486
487def jenkins_junit_xml(result):
488    """An XML tree object that when written is recognizable by Jenkins.
489
490    Args:
491      result (AugmentedResult): The result object to get the junit xml output of.
492
493    Returns:
494      ElementTree.ElementTree: The XML tree.
495    """
496    assert isinstance(result, AugmentedResult)
497    root = ElementTree.Element("testsuites")
498    suite = ElementTree.SubElement(
499        root,
500        "testsuite",
501        {
502            "name": "Python gRPC tests",
503        },
504    )
505    for case in result.cases.values():
506        if case.kind is CaseResult.Kind.SUCCESS:
507            ElementTree.SubElement(
508                suite,
509                "testcase",
510                {
511                    "name": case.name,
512                },
513            )
514        elif case.kind in (CaseResult.Kind.ERROR, CaseResult.Kind.FAILURE):
515            case_xml = ElementTree.SubElement(
516                suite,
517                "testcase",
518                {
519                    "name": case.name,
520                },
521            )
522            error_xml = ElementTree.SubElement(case_xml, "error", {})
523            error_xml.text = "".format(case.stderr, case.traceback)
524    return ElementTree.ElementTree(element=root)
525