1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from __future__ import absolute_import 16 17import collections 18import io 19import itertools 20import traceback 21import unittest 22from xml.etree import ElementTree 23 24import coverage 25 26from tests import _loader 27 28 29class CaseResult( 30 collections.namedtuple( 31 "CaseResult", 32 ["id", "name", "kind", "stdout", "stderr", "skip_reason", "traceback"], 33 ) 34): 35 """A serializable result of a single test case. 36 37 Attributes: 38 id (object): Any serializable object used to denote the identity of this 39 test case. 40 name (str or None): A human-readable name of the test case. 41 kind (CaseResult.Kind): The kind of test result. 42 stdout (object or None): Output on stdout, or None if nothing was captured. 43 stderr (object or None): Output on stderr, or None if nothing was captured. 44 skip_reason (object or None): The reason the test was skipped. Must be 45 something if self.kind is CaseResult.Kind.SKIP, else None. 46 traceback (object or None): The traceback of the test. Must be something if 47 self.kind is CaseResult.Kind.{ERROR, FAILURE, EXPECTED_FAILURE}, else 48 None. 49 """ 50 51 class Kind(object): 52 UNTESTED = "untested" 53 RUNNING = "running" 54 ERROR = "error" 55 FAILURE = "failure" 56 SUCCESS = "success" 57 SKIP = "skip" 58 EXPECTED_FAILURE = "expected failure" 59 UNEXPECTED_SUCCESS = "unexpected success" 60 61 def __new__( 62 cls, 63 id=None, 64 name=None, 65 kind=None, 66 stdout=None, 67 stderr=None, 68 skip_reason=None, 69 traceback=None, 70 ): 71 """Helper keyword constructor for the namedtuple. 72 73 See this class' attributes for information on the arguments.""" 74 assert id is not None 75 assert name is None or isinstance(name, str) 76 if kind is CaseResult.Kind.UNTESTED: 77 pass 78 elif kind is CaseResult.Kind.RUNNING: 79 pass 80 elif kind is CaseResult.Kind.ERROR: 81 assert traceback is not None 82 elif kind is CaseResult.Kind.FAILURE: 83 assert traceback is not None 84 elif kind is CaseResult.Kind.SUCCESS: 85 pass 86 elif kind is CaseResult.Kind.SKIP: 87 assert skip_reason is not None 88 elif kind is CaseResult.Kind.EXPECTED_FAILURE: 89 assert traceback is not None 90 elif kind is CaseResult.Kind.UNEXPECTED_SUCCESS: 91 pass 92 else: 93 assert False 94 return super(cls, CaseResult).__new__( 95 cls, id, name, kind, stdout, stderr, skip_reason, traceback 96 ) 97 98 def updated( 99 self, 100 name=None, 101 kind=None, 102 stdout=None, 103 stderr=None, 104 skip_reason=None, 105 traceback=None, 106 ): 107 """Get a new validated CaseResult with the fields updated. 108 109 See this class' attributes for information on the arguments.""" 110 name = self.name if name is None else name 111 kind = self.kind if kind is None else kind 112 stdout = self.stdout if stdout is None else stdout 113 stderr = self.stderr if stderr is None else stderr 114 skip_reason = self.skip_reason if skip_reason is None else skip_reason 115 traceback = self.traceback if traceback is None else traceback 116 return CaseResult( 117 id=self.id, 118 name=name, 119 kind=kind, 120 stdout=stdout, 121 stderr=stderr, 122 skip_reason=skip_reason, 123 traceback=traceback, 124 ) 125 126 127class AugmentedResult(unittest.TestResult): 128 """unittest.Result that keeps track of additional information. 129 130 Uses CaseResult objects to store test-case results, providing additional 131 information beyond that of the standard Python unittest library, such as 132 standard output. 133 134 Attributes: 135 id_map (callable): A unary callable mapping unittest.TestCase objects to 136 unique identifiers. 137 cases (dict): A dictionary mapping from the identifiers returned by id_map 138 to CaseResult objects corresponding to those IDs. 139 """ 140 141 def __init__(self, id_map): 142 """Initialize the object with an identifier mapping. 143 144 Arguments: 145 id_map (callable): Corresponds to the attribute `id_map`.""" 146 super(AugmentedResult, self).__init__() 147 self.id_map = id_map 148 self.cases = None 149 150 def startTestRun(self): 151 """See unittest.TestResult.startTestRun.""" 152 super(AugmentedResult, self).startTestRun() 153 self.cases = dict() 154 155 def startTest(self, test): 156 """See unittest.TestResult.startTest.""" 157 super(AugmentedResult, self).startTest(test) 158 case_id = self.id_map(test) 159 self.cases[case_id] = CaseResult( 160 id=case_id, name=test.id(), kind=CaseResult.Kind.RUNNING 161 ) 162 163 def addError(self, test, err): 164 """See unittest.TestResult.addError.""" 165 super(AugmentedResult, self).addError(test, err) 166 case_id = self.id_map(test) 167 self.cases[case_id] = self.cases[case_id].updated( 168 kind=CaseResult.Kind.ERROR, traceback=err 169 ) 170 171 def addFailure(self, test, err): 172 """See unittest.TestResult.addFailure.""" 173 super(AugmentedResult, self).addFailure(test, err) 174 case_id = self.id_map(test) 175 self.cases[case_id] = self.cases[case_id].updated( 176 kind=CaseResult.Kind.FAILURE, traceback=err 177 ) 178 179 def addSuccess(self, test): 180 """See unittest.TestResult.addSuccess.""" 181 super(AugmentedResult, self).addSuccess(test) 182 case_id = self.id_map(test) 183 self.cases[case_id] = self.cases[case_id].updated( 184 kind=CaseResult.Kind.SUCCESS 185 ) 186 187 def addSkip(self, test, reason): 188 """See unittest.TestResult.addSkip.""" 189 super(AugmentedResult, self).addSkip(test, reason) 190 case_id = self.id_map(test) 191 self.cases[case_id] = self.cases[case_id].updated( 192 kind=CaseResult.Kind.SKIP, skip_reason=reason 193 ) 194 195 def addExpectedFailure(self, test, err): 196 """See unittest.TestResult.addExpectedFailure.""" 197 super(AugmentedResult, self).addExpectedFailure(test, err) 198 case_id = self.id_map(test) 199 self.cases[case_id] = self.cases[case_id].updated( 200 kind=CaseResult.Kind.EXPECTED_FAILURE, traceback=err 201 ) 202 203 def addUnexpectedSuccess(self, test): 204 """See unittest.TestResult.addUnexpectedSuccess.""" 205 super(AugmentedResult, self).addUnexpectedSuccess(test) 206 case_id = self.id_map(test) 207 self.cases[case_id] = self.cases[case_id].updated( 208 kind=CaseResult.Kind.UNEXPECTED_SUCCESS 209 ) 210 211 def set_output(self, test, stdout, stderr): 212 """Set the output attributes for the CaseResult corresponding to a test. 213 214 Args: 215 test (unittest.TestCase): The TestCase to set the outputs of. 216 stdout (str): Output from stdout to assign to self.id_map(test). 217 stderr (str): Output from stderr to assign to self.id_map(test). 218 """ 219 case_id = self.id_map(test) 220 self.cases[case_id] = self.cases[case_id].updated( 221 stdout=stdout.decode(), stderr=stderr.decode() 222 ) 223 224 def augmented_results(self, filter): 225 """Convenience method to retrieve filtered case results. 226 227 Args: 228 filter (callable): A unary predicate to filter over CaseResult objects. 229 """ 230 return ( 231 self.cases[case_id] 232 for case_id in self.cases 233 if filter(self.cases[case_id]) 234 ) 235 236 237class CoverageResult(AugmentedResult): 238 """Extension to AugmentedResult adding coverage.py support per test.\ 239 240 Attributes: 241 coverage_context (coverage.Coverage): coverage.py management object. 242 """ 243 244 def __init__(self, id_map): 245 """See AugmentedResult.__init__.""" 246 super(CoverageResult, self).__init__(id_map=id_map) 247 self.coverage_context = None 248 249 def startTest(self, test): 250 """See unittest.TestResult.startTest. 251 252 Additionally initializes and begins code coverage tracking.""" 253 super(CoverageResult, self).startTest(test) 254 self.coverage_context = coverage.Coverage(data_suffix=True) 255 self.coverage_context.start() 256 257 def stopTest(self, test): 258 """See unittest.TestResult.stopTest. 259 260 Additionally stops and deinitializes code coverage tracking.""" 261 super(CoverageResult, self).stopTest(test) 262 self.coverage_context.stop() 263 self.coverage_context.save() 264 self.coverage_context = None 265 266 267class _Colors(object): 268 """Namespaced constants for terminal color magic numbers.""" 269 270 HEADER = "\033[95m" 271 INFO = "\033[94m" 272 OK = "\033[92m" 273 WARN = "\033[93m" 274 FAIL = "\033[91m" 275 BOLD = "\033[1m" 276 UNDERLINE = "\033[4m" 277 END = "\033[0m" 278 279 280class TerminalResult(CoverageResult): 281 """Extension to CoverageResult adding basic terminal reporting.""" 282 283 def __init__(self, out, id_map): 284 """Initialize the result object. 285 286 Args: 287 out (file-like): Output file to which terminal-colored live results will 288 be written. 289 id_map (callable): See AugmentedResult.__init__. 290 """ 291 super(TerminalResult, self).__init__(id_map=id_map) 292 self.out = out 293 294 def startTestRun(self): 295 """See unittest.TestResult.startTestRun.""" 296 super(TerminalResult, self).startTestRun() 297 self.out.write( 298 _Colors.HEADER + "Testing gRPC Python...\n" + _Colors.END 299 ) 300 301 def stopTestRun(self): 302 """See unittest.TestResult.stopTestRun.""" 303 super(TerminalResult, self).stopTestRun() 304 self.out.write(summary(self)) 305 self.out.flush() 306 307 def addError(self, test, err): 308 """See unittest.TestResult.addError.""" 309 super(TerminalResult, self).addError(test, err) 310 self.out.write( 311 _Colors.FAIL + "ERROR {}\n".format(test.id()) + _Colors.END 312 ) 313 self.out.flush() 314 315 def addFailure(self, test, err): 316 """See unittest.TestResult.addFailure.""" 317 super(TerminalResult, self).addFailure(test, err) 318 self.out.write( 319 _Colors.FAIL + "FAILURE {}\n".format(test.id()) + _Colors.END 320 ) 321 self.out.flush() 322 323 def addSuccess(self, test): 324 """See unittest.TestResult.addSuccess.""" 325 super(TerminalResult, self).addSuccess(test) 326 self.out.write( 327 _Colors.OK + "SUCCESS {}\n".format(test.id()) + _Colors.END 328 ) 329 self.out.flush() 330 331 def addSkip(self, test, reason): 332 """See unittest.TestResult.addSkip.""" 333 super(TerminalResult, self).addSkip(test, reason) 334 self.out.write( 335 _Colors.INFO + "SKIP {}\n".format(test.id()) + _Colors.END 336 ) 337 self.out.flush() 338 339 def addExpectedFailure(self, test, err): 340 """See unittest.TestResult.addExpectedFailure.""" 341 super(TerminalResult, self).addExpectedFailure(test, err) 342 self.out.write( 343 _Colors.INFO + "FAILURE_OK {}\n".format(test.id()) + _Colors.END 344 ) 345 self.out.flush() 346 347 def addUnexpectedSuccess(self, test): 348 """See unittest.TestResult.addUnexpectedSuccess.""" 349 super(TerminalResult, self).addUnexpectedSuccess(test) 350 self.out.write( 351 _Colors.INFO + "UNEXPECTED_OK {}\n".format(test.id()) + _Colors.END 352 ) 353 self.out.flush() 354 355 356def _traceback_string(type, value, trace): 357 """Generate a descriptive string of a Python exception traceback. 358 359 Args: 360 type (class): The type of the exception. 361 value (Exception): The value of the exception. 362 trace (traceback): Traceback of the exception. 363 364 Returns: 365 str: Formatted exception descriptive string. 366 """ 367 buffer = io.StringIO() 368 traceback.print_exception(type, value, trace, file=buffer) 369 return buffer.getvalue() 370 371 372def summary(result): 373 """A summary string of a result object. 374 375 Args: 376 result (AugmentedResult): The result object to get the summary of. 377 378 Returns: 379 str: The summary string. 380 """ 381 assert isinstance(result, AugmentedResult) 382 untested = list( 383 result.augmented_results( 384 lambda case_result: case_result.kind is CaseResult.Kind.UNTESTED 385 ) 386 ) 387 running = list( 388 result.augmented_results( 389 lambda case_result: case_result.kind is CaseResult.Kind.RUNNING 390 ) 391 ) 392 failures = list( 393 result.augmented_results( 394 lambda case_result: case_result.kind is CaseResult.Kind.FAILURE 395 ) 396 ) 397 errors = list( 398 result.augmented_results( 399 lambda case_result: case_result.kind is CaseResult.Kind.ERROR 400 ) 401 ) 402 successes = list( 403 result.augmented_results( 404 lambda case_result: case_result.kind is CaseResult.Kind.SUCCESS 405 ) 406 ) 407 skips = list( 408 result.augmented_results( 409 lambda case_result: case_result.kind is CaseResult.Kind.SKIP 410 ) 411 ) 412 expected_failures = list( 413 result.augmented_results( 414 lambda case_result: case_result.kind 415 is CaseResult.Kind.EXPECTED_FAILURE 416 ) 417 ) 418 unexpected_successes = list( 419 result.augmented_results( 420 lambda case_result: case_result.kind 421 is CaseResult.Kind.UNEXPECTED_SUCCESS 422 ) 423 ) 424 running_names = [case.name for case in running] 425 finished_count = ( 426 len(failures) 427 + len(errors) 428 + len(successes) 429 + len(expected_failures) 430 + len(unexpected_successes) 431 ) 432 statistics = ( 433 "{finished} tests finished:\n" 434 "\t{successful} successful\n" 435 "\t{unsuccessful} unsuccessful\n" 436 "\t{skipped} skipped\n" 437 "\t{expected_fail} expected failures\n" 438 "\t{unexpected_successful} unexpected successes\n" 439 "Interrupted Tests:\n" 440 "\t{interrupted}\n".format( 441 finished=finished_count, 442 successful=len(successes), 443 unsuccessful=(len(failures) + len(errors)), 444 skipped=len(skips), 445 expected_fail=len(expected_failures), 446 unexpected_successful=len(unexpected_successes), 447 interrupted=str(running_names), 448 ) 449 ) 450 tracebacks = "\n\n".join( 451 [ 452 ( 453 _Colors.FAIL 454 + "{test_name}" 455 + _Colors.END 456 + "\n" 457 + _Colors.BOLD 458 + "traceback:" 459 + _Colors.END 460 + "\n" 461 + "{traceback}\n" 462 + _Colors.BOLD 463 + "stdout:" 464 + _Colors.END 465 + "\n" 466 + "{stdout}\n" 467 + _Colors.BOLD 468 + "stderr:" 469 + _Colors.END 470 + "\n" 471 + "{stderr}\n" 472 ).format( 473 test_name=result.name, 474 traceback=_traceback_string(*result.traceback), 475 stdout=result.stdout, 476 stderr=result.stderr, 477 ) 478 for result in itertools.chain(failures, errors) 479 ] 480 ) 481 notes = "Unexpected successes: {}\n".format( 482 [result.name for result in unexpected_successes] 483 ) 484 return statistics + "\nErrors/Failures: \n" + tracebacks + "\n" + notes 485 486 487def jenkins_junit_xml(result): 488 """An XML tree object that when written is recognizable by Jenkins. 489 490 Args: 491 result (AugmentedResult): The result object to get the junit xml output of. 492 493 Returns: 494 ElementTree.ElementTree: The XML tree. 495 """ 496 assert isinstance(result, AugmentedResult) 497 root = ElementTree.Element("testsuites") 498 suite = ElementTree.SubElement( 499 root, 500 "testsuite", 501 { 502 "name": "Python gRPC tests", 503 }, 504 ) 505 for case in result.cases.values(): 506 if case.kind is CaseResult.Kind.SUCCESS: 507 ElementTree.SubElement( 508 suite, 509 "testcase", 510 { 511 "name": case.name, 512 }, 513 ) 514 elif case.kind in (CaseResult.Kind.ERROR, CaseResult.Kind.FAILURE): 515 case_xml = ElementTree.SubElement( 516 suite, 517 "testcase", 518 { 519 "name": case.name, 520 }, 521 ) 522 error_xml = ElementTree.SubElement(case_xml, "error", {}) 523 error_xml.text = "".format(case.stderr, case.traceback) 524 return ElementTree.ElementTree(element=root) 525