1# Run the _testcapi module tests (tests for the Python/C API): by defn, 2# these are all functions _testcapi exports whose name begins with 'test_'. 3 4from collections import OrderedDict 5import _thread 6import importlib.machinery 7import importlib.util 8import os 9import pickle 10import random 11import re 12import subprocess 13import sys 14import textwrap 15import threading 16import time 17import unittest 18import weakref 19from test import support 20from test.support import MISSING_C_DOCSTRINGS 21from test.support import import_helper 22from test.support import threading_helper 23from test.support import warnings_helper 24from test.support.script_helper import assert_python_failure, assert_python_ok 25try: 26 import _posixsubprocess 27except ImportError: 28 _posixsubprocess = None 29try: 30 import _testmultiphase 31except ImportError: 32 _testmultiphase = None 33 34# Skip this test if the _testcapi module isn't available. 35_testcapi = import_helper.import_module('_testcapi') 36 37import _testinternalcapi 38 39# Were we compiled --with-pydebug or with #define Py_DEBUG? 40Py_DEBUG = hasattr(sys, 'gettotalrefcount') 41 42 43def decode_stderr(err): 44 return err.decode('utf-8', 'replace').replace('\r', '') 45 46 47def testfunction(self): 48 """some doc""" 49 return self 50 51 52class InstanceMethod: 53 id = _testcapi.instancemethod(id) 54 testfunction = _testcapi.instancemethod(testfunction) 55 56class CAPITest(unittest.TestCase): 57 58 def test_instancemethod(self): 59 inst = InstanceMethod() 60 self.assertEqual(id(inst), inst.id()) 61 self.assertTrue(inst.testfunction() is inst) 62 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__) 63 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__) 64 65 InstanceMethod.testfunction.attribute = "test" 66 self.assertEqual(testfunction.attribute, "test") 67 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test") 68 69 @support.requires_subprocess() 70 def test_no_FatalError_infinite_loop(self): 71 with support.SuppressCrashReport(): 72 p = subprocess.Popen([sys.executable, "-c", 73 'import _testcapi;' 74 '_testcapi.crash_no_current_thread()'], 75 stdout=subprocess.PIPE, 76 stderr=subprocess.PIPE) 77 (out, err) = p.communicate() 78 self.assertEqual(out, b'') 79 # This used to cause an infinite loop. 80 self.assertTrue(err.rstrip().startswith( 81 b'Fatal Python error: ' 82 b'PyThreadState_Get: ' 83 b'the function must be called with the GIL held, ' 84 b'but the GIL is released ' 85 b'(the current Python thread state is NULL)'), 86 err) 87 88 def test_memoryview_from_NULL_pointer(self): 89 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer) 90 91 def test_exception(self): 92 raised_exception = ValueError("5") 93 new_exc = TypeError("TEST") 94 try: 95 raise raised_exception 96 except ValueError as e: 97 orig_sys_exception = sys.exception() 98 orig_exception = _testcapi.set_exception(new_exc) 99 new_sys_exception = sys.exception() 100 new_exception = _testcapi.set_exception(orig_exception) 101 reset_sys_exception = sys.exception() 102 103 self.assertEqual(orig_exception, e) 104 105 self.assertEqual(orig_exception, raised_exception) 106 self.assertEqual(orig_sys_exception, orig_exception) 107 self.assertEqual(reset_sys_exception, orig_exception) 108 self.assertEqual(new_exception, new_exc) 109 self.assertEqual(new_sys_exception, new_exception) 110 else: 111 self.fail("Exception not raised") 112 113 def test_exc_info(self): 114 raised_exception = ValueError("5") 115 new_exc = TypeError("TEST") 116 try: 117 raise raised_exception 118 except ValueError as e: 119 tb = e.__traceback__ 120 orig_sys_exc_info = sys.exc_info() 121 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None) 122 new_sys_exc_info = sys.exc_info() 123 new_exc_info = _testcapi.set_exc_info(*orig_exc_info) 124 reset_sys_exc_info = sys.exc_info() 125 126 self.assertEqual(orig_exc_info[1], e) 127 128 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb)) 129 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info) 130 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info) 131 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None)) 132 self.assertSequenceEqual(new_sys_exc_info, new_exc_info) 133 else: 134 self.assertTrue(False) 135 136 def test_set_object(self): 137 # new exception as obj is not an exception 138 with self.assertRaises(ValueError) as e: 139 _testcapi.exc_set_object(ValueError, 42) 140 self.assertEqual(e.exception.args, (42,)) 141 142 # wraps the exception because unrelated types 143 with self.assertRaises(ValueError) as e: 144 _testcapi.exc_set_object(ValueError, TypeError(1,2,3)) 145 wrapped = e.exception.args[0] 146 self.assertIsInstance(wrapped, TypeError) 147 self.assertEqual(wrapped.args, (1, 2, 3)) 148 149 # is superclass, so does not wrap 150 with self.assertRaises(PermissionError) as e: 151 _testcapi.exc_set_object(OSError, PermissionError(24)) 152 self.assertEqual(e.exception.args, (24,)) 153 154 class Meta(type): 155 def __subclasscheck__(cls, sub): 156 1/0 157 158 class Broken(Exception, metaclass=Meta): 159 pass 160 161 with self.assertRaises(ZeroDivisionError) as e: 162 _testcapi.exc_set_object(Broken, Broken()) 163 164 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 165 def test_seq_bytes_to_charp_array(self): 166 # Issue #15732: crash in _PySequence_BytesToCharpArray() 167 class Z(object): 168 def __len__(self): 169 return 1 170 with self.assertRaisesRegex(TypeError, 'indexing'): 171 _posixsubprocess.fork_exec( 172 1,Z(),True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False) 173 # Issue #15736: overflow in _PySequence_BytesToCharpArray() 174 class Z(object): 175 def __len__(self): 176 return sys.maxsize 177 def __getitem__(self, i): 178 return b'x' 179 self.assertRaises(MemoryError, _posixsubprocess.fork_exec, 180 1,Z(),True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False) 181 182 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 183 def test_subprocess_fork_exec(self): 184 class Z(object): 185 def __len__(self): 186 return 1 187 188 # Issue #15738: crash in subprocess_fork_exec() 189 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 190 Z(),[b'1'],True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False) 191 192 @unittest.skipIf(MISSING_C_DOCSTRINGS, 193 "Signature information for builtins requires docstrings") 194 def test_docstring_signature_parsing(self): 195 196 self.assertEqual(_testcapi.no_docstring.__doc__, None) 197 self.assertEqual(_testcapi.no_docstring.__text_signature__, None) 198 199 self.assertEqual(_testcapi.docstring_empty.__doc__, None) 200 self.assertEqual(_testcapi.docstring_empty.__text_signature__, None) 201 202 self.assertEqual(_testcapi.docstring_no_signature.__doc__, 203 "This docstring has no signature.") 204 self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None) 205 206 self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__, 207 "docstring_with_invalid_signature($module, /, boo)\n" 208 "\n" 209 "This docstring has an invalid signature." 210 ) 211 self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None) 212 213 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__, 214 "docstring_with_invalid_signature2($module, /, boo)\n" 215 "\n" 216 "--\n" 217 "\n" 218 "This docstring also has an invalid signature." 219 ) 220 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None) 221 222 self.assertEqual(_testcapi.docstring_with_signature.__doc__, 223 "This docstring has a valid signature.") 224 self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)") 225 226 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None) 227 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__, 228 "($module, /, sig)") 229 230 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__, 231 "\nThis docstring has a valid signature and some extra newlines.") 232 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__, 233 "($module, /, parameter)") 234 235 def test_c_type_with_matrix_multiplication(self): 236 M = _testcapi.matmulType 237 m1 = M() 238 m2 = M() 239 self.assertEqual(m1 @ m2, ("matmul", m1, m2)) 240 self.assertEqual(m1 @ 42, ("matmul", m1, 42)) 241 self.assertEqual(42 @ m1, ("matmul", 42, m1)) 242 o = m1 243 o @= m2 244 self.assertEqual(o, ("imatmul", m1, m2)) 245 o = m1 246 o @= 42 247 self.assertEqual(o, ("imatmul", m1, 42)) 248 o = 42 249 o @= m1 250 self.assertEqual(o, ("matmul", 42, m1)) 251 252 def test_c_type_with_ipow(self): 253 # When the __ipow__ method of a type was implemented in C, using the 254 # modulo param would cause segfaults. 255 o = _testcapi.ipowType() 256 self.assertEqual(o.__ipow__(1), (1, None)) 257 self.assertEqual(o.__ipow__(2, 2), (2, 2)) 258 259 def test_return_null_without_error(self): 260 # Issue #23571: A function must not return NULL without setting an 261 # error 262 if Py_DEBUG: 263 code = textwrap.dedent(""" 264 import _testcapi 265 from test import support 266 267 with support.SuppressCrashReport(): 268 _testcapi.return_null_without_error() 269 """) 270 rc, out, err = assert_python_failure('-c', code) 271 err = decode_stderr(err) 272 self.assertRegex(err, 273 r'Fatal Python error: _Py_CheckFunctionResult: ' 274 r'a function returned NULL without setting an exception\n' 275 r'Python runtime state: initialized\n' 276 r'SystemError: <built-in function return_null_without_error> ' 277 r'returned NULL without setting an exception\n' 278 r'\n' 279 r'Current thread.*:\n' 280 r' File .*", line 6 in <module>\n') 281 else: 282 with self.assertRaises(SystemError) as cm: 283 _testcapi.return_null_without_error() 284 self.assertRegex(str(cm.exception), 285 'return_null_without_error.* ' 286 'returned NULL without setting an exception') 287 288 def test_return_result_with_error(self): 289 # Issue #23571: A function must not return a result with an error set 290 if Py_DEBUG: 291 code = textwrap.dedent(""" 292 import _testcapi 293 from test import support 294 295 with support.SuppressCrashReport(): 296 _testcapi.return_result_with_error() 297 """) 298 rc, out, err = assert_python_failure('-c', code) 299 err = decode_stderr(err) 300 self.assertRegex(err, 301 r'Fatal Python error: _Py_CheckFunctionResult: ' 302 r'a function returned a result with an exception set\n' 303 r'Python runtime state: initialized\n' 304 r'ValueError\n' 305 r'\n' 306 r'The above exception was the direct cause ' 307 r'of the following exception:\n' 308 r'\n' 309 r'SystemError: <built-in ' 310 r'function return_result_with_error> ' 311 r'returned a result with an exception set\n' 312 r'\n' 313 r'Current thread.*:\n' 314 r' File .*, line 6 in <module>\n') 315 else: 316 with self.assertRaises(SystemError) as cm: 317 _testcapi.return_result_with_error() 318 self.assertRegex(str(cm.exception), 319 'return_result_with_error.* ' 320 'returned a result with an exception set') 321 322 def test_getitem_with_error(self): 323 # Test _Py_CheckSlotResult(). Raise an exception and then calls 324 # PyObject_GetItem(): check that the assertion catches the bug. 325 # PyObject_GetItem() must not be called with an exception set. 326 code = textwrap.dedent(""" 327 import _testcapi 328 from test import support 329 330 with support.SuppressCrashReport(): 331 _testcapi.getitem_with_error({1: 2}, 1) 332 """) 333 rc, out, err = assert_python_failure('-c', code) 334 err = decode_stderr(err) 335 if 'SystemError: ' not in err: 336 self.assertRegex(err, 337 r'Fatal Python error: _Py_CheckSlotResult: ' 338 r'Slot __getitem__ of type dict succeeded ' 339 r'with an exception set\n' 340 r'Python runtime state: initialized\n' 341 r'ValueError: bug\n' 342 r'\n' 343 r'Current thread .* \(most recent call first\):\n' 344 r' File .*, line 6 in <module>\n' 345 r'\n' 346 r'Extension modules: _testcapi \(total: 1\)\n') 347 else: 348 # Python built with NDEBUG macro defined: 349 # test _Py_CheckFunctionResult() instead. 350 self.assertIn('returned a result with an exception set', err) 351 352 def test_buildvalue_N(self): 353 _testcapi.test_buildvalue_N() 354 355 def test_set_nomemory(self): 356 code = """if 1: 357 import _testcapi 358 359 class C(): pass 360 361 # The first loop tests both functions and that remove_mem_hooks() 362 # can be called twice in a row. The second loop checks a call to 363 # set_nomemory() after a call to remove_mem_hooks(). The third 364 # loop checks the start and stop arguments of set_nomemory(). 365 for outer_cnt in range(1, 4): 366 start = 10 * outer_cnt 367 for j in range(100): 368 if j == 0: 369 if outer_cnt != 3: 370 _testcapi.set_nomemory(start) 371 else: 372 _testcapi.set_nomemory(start, start + 1) 373 try: 374 C() 375 except MemoryError as e: 376 if outer_cnt != 3: 377 _testcapi.remove_mem_hooks() 378 print('MemoryError', outer_cnt, j) 379 _testcapi.remove_mem_hooks() 380 break 381 """ 382 rc, out, err = assert_python_ok('-c', code) 383 lines = out.splitlines() 384 for i, line in enumerate(lines, 1): 385 self.assertIn(b'MemoryError', out) 386 *_, count = line.split(b' ') 387 count = int(count) 388 self.assertLessEqual(count, i*5) 389 self.assertGreaterEqual(count, i*5-2) 390 391 def test_mapping_keys_values_items(self): 392 class Mapping1(dict): 393 def keys(self): 394 return list(super().keys()) 395 def values(self): 396 return list(super().values()) 397 def items(self): 398 return list(super().items()) 399 class Mapping2(dict): 400 def keys(self): 401 return tuple(super().keys()) 402 def values(self): 403 return tuple(super().values()) 404 def items(self): 405 return tuple(super().items()) 406 dict_obj = {'foo': 1, 'bar': 2, 'spam': 3} 407 408 for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(), 409 dict_obj, OrderedDict(dict_obj), 410 Mapping1(dict_obj), Mapping2(dict_obj)]: 411 self.assertListEqual(_testcapi.get_mapping_keys(mapping), 412 list(mapping.keys())) 413 self.assertListEqual(_testcapi.get_mapping_values(mapping), 414 list(mapping.values())) 415 self.assertListEqual(_testcapi.get_mapping_items(mapping), 416 list(mapping.items())) 417 418 def test_mapping_keys_values_items_bad_arg(self): 419 self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None) 420 self.assertRaises(AttributeError, _testcapi.get_mapping_values, None) 421 self.assertRaises(AttributeError, _testcapi.get_mapping_items, None) 422 423 class BadMapping: 424 def keys(self): 425 return None 426 def values(self): 427 return None 428 def items(self): 429 return None 430 bad_mapping = BadMapping() 431 self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping) 432 self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping) 433 self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping) 434 435 def test_mapping_has_key(self): 436 dct = {'a': 1} 437 self.assertTrue(_testcapi.mapping_has_key(dct, 'a')) 438 self.assertFalse(_testcapi.mapping_has_key(dct, 'b')) 439 440 class SubDict(dict): 441 pass 442 443 dct2 = SubDict({'a': 1}) 444 self.assertTrue(_testcapi.mapping_has_key(dct2, 'a')) 445 self.assertFalse(_testcapi.mapping_has_key(dct2, 'b')) 446 447 def test_sequence_set_slice(self): 448 # Correct case: 449 data = [1, 2, 3, 4, 5] 450 data_copy = data.copy() 451 452 _testcapi.sequence_set_slice(data, 1, 3, [8, 9]) 453 data_copy[1:3] = [8, 9] 454 self.assertEqual(data, data_copy) 455 self.assertEqual(data, [1, 8, 9, 4, 5]) 456 457 # Custom class: 458 class Custom: 459 def __setitem__(self, index, value): 460 self.index = index 461 self.value = value 462 463 c = Custom() 464 _testcapi.sequence_set_slice(c, 0, 5, 'abc') 465 self.assertEqual(c.index, slice(0, 5)) 466 self.assertEqual(c.value, 'abc') 467 468 # Immutable sequences must raise: 469 bad_seq1 = (1, 2, 3, 4) 470 with self.assertRaises(TypeError): 471 _testcapi.sequence_set_slice(bad_seq1, 1, 3, (8, 9)) 472 self.assertEqual(bad_seq1, (1, 2, 3, 4)) 473 474 bad_seq2 = 'abcd' 475 with self.assertRaises(TypeError): 476 _testcapi.sequence_set_slice(bad_seq2, 1, 3, 'xy') 477 self.assertEqual(bad_seq2, 'abcd') 478 479 # Not a sequence: 480 with self.assertRaises(TypeError): 481 _testcapi.sequence_set_slice(None, 1, 3, 'xy') 482 483 mapping = {1: 'a', 2: 'b', 3: 'c'} 484 with self.assertRaises(TypeError): 485 _testcapi.sequence_set_slice(mapping, 1, 3, 'xy') 486 self.assertEqual(mapping, {1: 'a', 2: 'b', 3: 'c'}) 487 488 def test_sequence_del_slice(self): 489 # Correct case: 490 data = [1, 2, 3, 4, 5] 491 data_copy = data.copy() 492 493 _testcapi.sequence_del_slice(data, 1, 3) 494 del data_copy[1:3] 495 self.assertEqual(data, data_copy) 496 self.assertEqual(data, [1, 4, 5]) 497 498 # Custom class: 499 class Custom: 500 def __delitem__(self, index): 501 self.index = index 502 503 c = Custom() 504 _testcapi.sequence_del_slice(c, 0, 5) 505 self.assertEqual(c.index, slice(0, 5)) 506 507 # Immutable sequences must raise: 508 bad_seq1 = (1, 2, 3, 4) 509 with self.assertRaises(TypeError): 510 _testcapi.sequence_del_slice(bad_seq1, 1, 3) 511 self.assertEqual(bad_seq1, (1, 2, 3, 4)) 512 513 bad_seq2 = 'abcd' 514 with self.assertRaises(TypeError): 515 _testcapi.sequence_del_slice(bad_seq2, 1, 3) 516 self.assertEqual(bad_seq2, 'abcd') 517 518 # Not a sequence: 519 with self.assertRaises(TypeError): 520 _testcapi.sequence_del_slice(None, 1, 3) 521 522 mapping = {1: 'a', 2: 'b', 3: 'c'} 523 with self.assertRaises(TypeError): 524 _testcapi.sequence_del_slice(mapping, 1, 3) 525 self.assertEqual(mapping, {1: 'a', 2: 'b', 3: 'c'}) 526 527 @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'), 528 'need _testcapi.negative_refcount') 529 def test_negative_refcount(self): 530 # bpo-35059: Check that Py_DECREF() reports the correct filename 531 # when calling _Py_NegativeRefcount() to abort Python. 532 code = textwrap.dedent(""" 533 import _testcapi 534 from test import support 535 536 with support.SuppressCrashReport(): 537 _testcapi.negative_refcount() 538 """) 539 rc, out, err = assert_python_failure('-c', code) 540 self.assertRegex(err, 541 br'_testcapimodule\.c:[0-9]+: ' 542 br'_Py_NegativeRefcount: Assertion failed: ' 543 br'object has negative ref count') 544 545 def test_trashcan_subclass(self): 546 # bpo-35983: Check that the trashcan mechanism for "list" is NOT 547 # activated when its tp_dealloc is being called by a subclass 548 from _testcapi import MyList 549 L = None 550 for i in range(1000): 551 L = MyList((L,)) 552 553 @support.requires_resource('cpu') 554 def test_trashcan_python_class1(self): 555 self.do_test_trashcan_python_class(list) 556 557 @support.requires_resource('cpu') 558 def test_trashcan_python_class2(self): 559 from _testcapi import MyList 560 self.do_test_trashcan_python_class(MyList) 561 562 def do_test_trashcan_python_class(self, base): 563 # Check that the trashcan mechanism works properly for a Python 564 # subclass of a class using the trashcan (this specific test assumes 565 # that the base class "base" behaves like list) 566 class PyList(base): 567 # Count the number of PyList instances to verify that there is 568 # no memory leak 569 num = 0 570 def __init__(self, *args): 571 __class__.num += 1 572 super().__init__(*args) 573 def __del__(self): 574 __class__.num -= 1 575 576 for parity in (0, 1): 577 L = None 578 # We need in the order of 2**20 iterations here such that a 579 # typical 8MB stack would overflow without the trashcan. 580 for i in range(2**20): 581 L = PyList((L,)) 582 L.attr = i 583 if parity: 584 # Add one additional nesting layer 585 L = (L,) 586 self.assertGreater(PyList.num, 0) 587 del L 588 self.assertEqual(PyList.num, 0) 589 590 def test_heap_ctype_doc_and_text_signature(self): 591 self.assertEqual(_testcapi.HeapDocCType.__doc__, "somedoc") 592 self.assertEqual(_testcapi.HeapDocCType.__text_signature__, "(arg1, arg2)") 593 594 def test_null_type_doc(self): 595 self.assertEqual(_testcapi.NullTpDocType.__doc__, None) 596 597 def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self): 598 class HeapGcCTypeSubclass(_testcapi.HeapGcCType): 599 def __init__(self): 600 self.value2 = 20 601 super().__init__() 602 603 subclass_instance = HeapGcCTypeSubclass() 604 type_refcnt = sys.getrefcount(HeapGcCTypeSubclass) 605 606 # Test that subclass instance was fully created 607 self.assertEqual(subclass_instance.value, 10) 608 self.assertEqual(subclass_instance.value2, 20) 609 610 # Test that the type reference count is only decremented once 611 del subclass_instance 612 self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass)) 613 614 def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 615 class A(_testcapi.HeapGcCType): 616 def __init__(self): 617 self.value2 = 20 618 super().__init__() 619 620 class B(A): 621 def __init__(self): 622 super().__init__() 623 624 def __del__(self): 625 self.__class__ = A 626 A.refcnt_in_del = sys.getrefcount(A) 627 B.refcnt_in_del = sys.getrefcount(B) 628 629 subclass_instance = B() 630 type_refcnt = sys.getrefcount(B) 631 new_type_refcnt = sys.getrefcount(A) 632 633 # Test that subclass instance was fully created 634 self.assertEqual(subclass_instance.value, 10) 635 self.assertEqual(subclass_instance.value2, 20) 636 637 del subclass_instance 638 639 # Test that setting __class__ modified the reference counts of the types 640 if Py_DEBUG: 641 # gh-89373: In debug mode, _Py_Dealloc() keeps a strong reference 642 # to the type while calling tp_dealloc() 643 self.assertEqual(type_refcnt, B.refcnt_in_del) 644 else: 645 self.assertEqual(type_refcnt - 1, B.refcnt_in_del) 646 self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del) 647 648 # Test that the original type already has decreased its refcnt 649 self.assertEqual(type_refcnt - 1, sys.getrefcount(B)) 650 651 # Test that subtype_dealloc decref the newly assigned __class__ only once 652 self.assertEqual(new_type_refcnt, sys.getrefcount(A)) 653 654 def test_heaptype_with_dict(self): 655 inst = _testcapi.HeapCTypeWithDict() 656 inst.foo = 42 657 self.assertEqual(inst.foo, 42) 658 self.assertEqual(inst.dictobj, inst.__dict__) 659 self.assertEqual(inst.dictobj, {"foo": 42}) 660 661 inst = _testcapi.HeapCTypeWithDict() 662 self.assertEqual({}, inst.__dict__) 663 664 def test_heaptype_with_negative_dict(self): 665 inst = _testcapi.HeapCTypeWithNegativeDict() 666 inst.foo = 42 667 self.assertEqual(inst.foo, 42) 668 self.assertEqual(inst.dictobj, inst.__dict__) 669 self.assertEqual(inst.dictobj, {"foo": 42}) 670 671 inst = _testcapi.HeapCTypeWithNegativeDict() 672 self.assertEqual({}, inst.__dict__) 673 674 def test_heaptype_with_weakref(self): 675 inst = _testcapi.HeapCTypeWithWeakref() 676 ref = weakref.ref(inst) 677 self.assertEqual(ref(), inst) 678 self.assertEqual(inst.weakreflist, ref) 679 680 def test_heaptype_with_buffer(self): 681 inst = _testcapi.HeapCTypeWithBuffer() 682 b = bytes(inst) 683 self.assertEqual(b, b"1234") 684 685 def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self): 686 subclass_instance = _testcapi.HeapCTypeSubclass() 687 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 688 689 # Test that subclass instance was fully created 690 self.assertEqual(subclass_instance.value, 10) 691 self.assertEqual(subclass_instance.value2, 20) 692 693 # Test that the type reference count is only decremented once 694 del subclass_instance 695 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 696 697 def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 698 subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer() 699 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer) 700 new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 701 702 # Test that subclass instance was fully created 703 self.assertEqual(subclass_instance.value, 10) 704 self.assertEqual(subclass_instance.value2, 20) 705 706 # The tp_finalize slot will set __class__ to HeapCTypeSubclass 707 del subclass_instance 708 709 # Test that setting __class__ modified the reference counts of the types 710 if Py_DEBUG: 711 # gh-89373: In debug mode, _Py_Dealloc() keeps a strong reference 712 # to the type while calling tp_dealloc() 713 self.assertEqual(type_refcnt, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del) 714 else: 715 self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del) 716 self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del) 717 718 # Test that the original type already has decreased its refcnt 719 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)) 720 721 # Test that subtype_dealloc decref the newly assigned __class__ only once 722 self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 723 724 def test_heaptype_with_setattro(self): 725 obj = _testcapi.HeapCTypeSetattr() 726 self.assertEqual(obj.pvalue, 10) 727 obj.value = 12 728 self.assertEqual(obj.pvalue, 12) 729 del obj.value 730 self.assertEqual(obj.pvalue, 0) 731 732 def test_multiple_inheritance_ctypes_with_weakref_or_dict(self): 733 734 class Both1(_testcapi.HeapCTypeWithWeakref, _testcapi.HeapCTypeWithDict): 735 pass 736 class Both2(_testcapi.HeapCTypeWithDict, _testcapi.HeapCTypeWithWeakref): 737 pass 738 739 for cls in (_testcapi.HeapCTypeWithDict, _testcapi.HeapCTypeWithDict2, 740 _testcapi.HeapCTypeWithWeakref, _testcapi.HeapCTypeWithWeakref2): 741 for cls2 in (_testcapi.HeapCTypeWithDict, _testcapi.HeapCTypeWithDict2, 742 _testcapi.HeapCTypeWithWeakref, _testcapi.HeapCTypeWithWeakref2): 743 if cls is not cls2: 744 class S(cls, cls2): 745 pass 746 class B1(Both1, cls): 747 pass 748 class B2(Both1, cls): 749 pass 750 751 def test_pynumber_tobase(self): 752 from _testcapi import pynumber_tobase 753 self.assertEqual(pynumber_tobase(123, 2), '0b1111011') 754 self.assertEqual(pynumber_tobase(123, 8), '0o173') 755 self.assertEqual(pynumber_tobase(123, 10), '123') 756 self.assertEqual(pynumber_tobase(123, 16), '0x7b') 757 self.assertEqual(pynumber_tobase(-123, 2), '-0b1111011') 758 self.assertEqual(pynumber_tobase(-123, 8), '-0o173') 759 self.assertEqual(pynumber_tobase(-123, 10), '-123') 760 self.assertEqual(pynumber_tobase(-123, 16), '-0x7b') 761 self.assertRaises(TypeError, pynumber_tobase, 123.0, 10) 762 self.assertRaises(TypeError, pynumber_tobase, '123', 10) 763 self.assertRaises(SystemError, pynumber_tobase, 123, 0) 764 765 def check_fatal_error(self, code, expected, not_expected=()): 766 with support.SuppressCrashReport(): 767 rc, out, err = assert_python_failure('-sSI', '-c', code) 768 769 err = decode_stderr(err) 770 self.assertIn('Fatal Python error: test_fatal_error: MESSAGE\n', 771 err) 772 773 match = re.search(r'^Extension modules:(.*) \(total: ([0-9]+)\)$', 774 err, re.MULTILINE) 775 if not match: 776 self.fail(f"Cannot find 'Extension modules:' in {err!r}") 777 modules = set(match.group(1).strip().split(', ')) 778 total = int(match.group(2)) 779 780 for name in expected: 781 self.assertIn(name, modules) 782 for name in not_expected: 783 self.assertNotIn(name, modules) 784 self.assertEqual(len(modules), total) 785 786 @support.requires_subprocess() 787 def test_fatal_error(self): 788 # By default, stdlib extension modules are ignored, 789 # but not test modules. 790 expected = ('_testcapi',) 791 not_expected = ('sys',) 792 code = 'import _testcapi, sys; _testcapi.fatal_error(b"MESSAGE")' 793 self.check_fatal_error(code, expected, not_expected) 794 795 # Mark _testcapi as stdlib module, but not sys 796 expected = ('sys',) 797 not_expected = ('_testcapi',) 798 code = textwrap.dedent(''' 799 import _testcapi, sys 800 sys.stdlib_module_names = frozenset({"_testcapi"}) 801 _testcapi.fatal_error(b"MESSAGE") 802 ''') 803 self.check_fatal_error(code, expected) 804 805 def test_pyobject_repr_from_null(self): 806 s = _testcapi.pyobject_repr_from_null() 807 self.assertEqual(s, '<NULL>') 808 809 def test_pyobject_str_from_null(self): 810 s = _testcapi.pyobject_str_from_null() 811 self.assertEqual(s, '<NULL>') 812 813 def test_pyobject_bytes_from_null(self): 814 s = _testcapi.pyobject_bytes_from_null() 815 self.assertEqual(s, b'<NULL>') 816 817 def test_Py_CompileString(self): 818 # Check that Py_CompileString respects the coding cookie 819 _compile = _testcapi.Py_CompileString 820 code = b"# -*- coding: latin1 -*-\nprint('\xc2\xa4')\n" 821 result = _compile(code) 822 expected = compile(code, "<string>", "exec") 823 self.assertEqual(result.co_consts, expected.co_consts) 824 825 def test_export_symbols(self): 826 # bpo-44133: Ensure that the "Py_FrozenMain" and 827 # "PyThread_get_thread_native_id" symbols are exported by the Python 828 # (directly by the binary, or via by the Python dynamic library). 829 ctypes = import_helper.import_module('ctypes') 830 names = [] 831 832 # Test if the PY_HAVE_THREAD_NATIVE_ID macro is defined 833 if hasattr(_thread, 'get_native_id'): 834 names.append('PyThread_get_thread_native_id') 835 836 # Python/frozenmain.c fails to build on Windows when the symbols are 837 # missing: 838 # - PyWinFreeze_ExeInit 839 # - PyWinFreeze_ExeTerm 840 # - PyInitFrozenExtensions 841 if os.name != 'nt': 842 names.append('Py_FrozenMain') 843 844 for name in names: 845 with self.subTest(name=name): 846 self.assertTrue(hasattr(ctypes.pythonapi, name)) 847 848 def test_eval_get_func_name(self): 849 def function_example(): ... 850 851 class A: 852 def method_example(self): ... 853 854 self.assertEqual(_testcapi.eval_get_func_name(function_example), 855 "function_example") 856 self.assertEqual(_testcapi.eval_get_func_name(A.method_example), 857 "method_example") 858 self.assertEqual(_testcapi.eval_get_func_name(A().method_example), 859 "method_example") 860 self.assertEqual(_testcapi.eval_get_func_name(sum), "sum") # c function 861 self.assertEqual(_testcapi.eval_get_func_name(A), "type") 862 863 def test_eval_get_func_desc(self): 864 def function_example(): ... 865 866 class A: 867 def method_example(self): ... 868 869 self.assertEqual(_testcapi.eval_get_func_desc(function_example), 870 "()") 871 self.assertEqual(_testcapi.eval_get_func_desc(A.method_example), 872 "()") 873 self.assertEqual(_testcapi.eval_get_func_desc(A().method_example), 874 "()") 875 self.assertEqual(_testcapi.eval_get_func_desc(sum), "()") # c function 876 self.assertEqual(_testcapi.eval_get_func_desc(A), " object") 877 878 def test_function_get_code(self): 879 import types 880 881 def some(): 882 pass 883 884 code = _testcapi.function_get_code(some) 885 self.assertIsInstance(code, types.CodeType) 886 self.assertEqual(code, some.__code__) 887 888 with self.assertRaises(SystemError): 889 _testcapi.function_get_code(None) # not a function 890 891 def test_function_get_globals(self): 892 def some(): 893 pass 894 895 globals_ = _testcapi.function_get_globals(some) 896 self.assertIsInstance(globals_, dict) 897 self.assertEqual(globals_, some.__globals__) 898 899 with self.assertRaises(SystemError): 900 _testcapi.function_get_globals(None) # not a function 901 902 def test_function_get_module(self): 903 def some(): 904 pass 905 906 module = _testcapi.function_get_module(some) 907 self.assertIsInstance(module, str) 908 self.assertEqual(module, some.__module__) 909 910 with self.assertRaises(SystemError): 911 _testcapi.function_get_module(None) # not a function 912 913 914class TestPendingCalls(unittest.TestCase): 915 916 def pendingcalls_submit(self, l, n): 917 def callback(): 918 #this function can be interrupted by thread switching so let's 919 #use an atomic operation 920 l.append(None) 921 922 for i in range(n): 923 time.sleep(random.random()*0.02) #0.01 secs on average 924 #try submitting callback until successful. 925 #rely on regular interrupt to flush queue if we are 926 #unsuccessful. 927 while True: 928 if _testcapi._pending_threadfunc(callback): 929 break 930 931 def pendingcalls_wait(self, l, n, context = None): 932 #now, stick around until l[0] has grown to 10 933 count = 0 934 while len(l) != n: 935 #this busy loop is where we expect to be interrupted to 936 #run our callbacks. Note that callbacks are only run on the 937 #main thread 938 if False and support.verbose: 939 print("(%i)"%(len(l),),) 940 for i in range(1000): 941 a = i*i 942 if context and not context.event.is_set(): 943 continue 944 count += 1 945 self.assertTrue(count < 10000, 946 "timeout waiting for %i callbacks, got %i"%(n, len(l))) 947 if False and support.verbose: 948 print("(%i)"%(len(l),)) 949 950 @threading_helper.requires_working_threading() 951 def test_pendingcalls_threaded(self): 952 953 #do every callback on a separate thread 954 n = 32 #total callbacks 955 threads = [] 956 class foo(object):pass 957 context = foo() 958 context.l = [] 959 context.n = 2 #submits per thread 960 context.nThreads = n // context.n 961 context.nFinished = 0 962 context.lock = threading.Lock() 963 context.event = threading.Event() 964 965 threads = [threading.Thread(target=self.pendingcalls_thread, 966 args=(context,)) 967 for i in range(context.nThreads)] 968 with threading_helper.start_threads(threads): 969 self.pendingcalls_wait(context.l, n, context) 970 971 def pendingcalls_thread(self, context): 972 try: 973 self.pendingcalls_submit(context.l, context.n) 974 finally: 975 with context.lock: 976 context.nFinished += 1 977 nFinished = context.nFinished 978 if False and support.verbose: 979 print("finished threads: ", nFinished) 980 if nFinished == context.nThreads: 981 context.event.set() 982 983 def test_pendingcalls_non_threaded(self): 984 #again, just using the main thread, likely they will all be dispatched at 985 #once. It is ok to ask for too many, because we loop until we find a slot. 986 #the loop can be interrupted to dispatch. 987 #there are only 32 dispatch slots, so we go for twice that! 988 l = [] 989 n = 64 990 self.pendingcalls_submit(l, n) 991 self.pendingcalls_wait(l, n) 992 993 994class SubinterpreterTest(unittest.TestCase): 995 996 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 997 def test_subinterps(self): 998 import builtins 999 r, w = os.pipe() 1000 code = """if 1: 1001 import sys, builtins, pickle 1002 with open({:d}, "wb") as f: 1003 pickle.dump(id(sys.modules), f) 1004 pickle.dump(id(builtins), f) 1005 """.format(w) 1006 with open(r, "rb") as f: 1007 ret = support.run_in_subinterp(code) 1008 self.assertEqual(ret, 0) 1009 self.assertNotEqual(pickle.load(f), id(sys.modules)) 1010 self.assertNotEqual(pickle.load(f), id(builtins)) 1011 1012 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 1013 def test_subinterps_recent_language_features(self): 1014 r, w = os.pipe() 1015 code = """if 1: 1016 import pickle 1017 with open({:d}, "wb") as f: 1018 1019 @(lambda x:x) # Py 3.9 1020 def noop(x): return x 1021 1022 a = (b := f'1{{2}}3') + noop('x') # Py 3.8 (:=) / 3.6 (f'') 1023 1024 async def foo(arg): return await arg # Py 3.5 1025 1026 pickle.dump(dict(a=a, b=b), f) 1027 """.format(w) 1028 1029 with open(r, "rb") as f: 1030 ret = support.run_in_subinterp(code) 1031 self.assertEqual(ret, 0) 1032 self.assertEqual(pickle.load(f), {'a': '123x', 'b': '123'}) 1033 1034 def test_mutate_exception(self): 1035 """ 1036 Exceptions saved in global module state get shared between 1037 individual module instances. This test checks whether or not 1038 a change in one interpreter's module gets reflected into the 1039 other ones. 1040 """ 1041 import binascii 1042 1043 support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'") 1044 1045 self.assertFalse(hasattr(binascii.Error, "foobar")) 1046 1047 @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module") 1048 def test_module_state_shared_in_global(self): 1049 """ 1050 bpo-44050: Extension module state should be shared between interpreters 1051 when it doesn't support sub-interpreters. 1052 """ 1053 r, w = os.pipe() 1054 self.addCleanup(os.close, r) 1055 self.addCleanup(os.close, w) 1056 1057 script = textwrap.dedent(f""" 1058 import importlib.machinery 1059 import importlib.util 1060 import os 1061 1062 fullname = '_test_module_state_shared' 1063 origin = importlib.util.find_spec('_testmultiphase').origin 1064 loader = importlib.machinery.ExtensionFileLoader(fullname, origin) 1065 spec = importlib.util.spec_from_loader(fullname, loader) 1066 module = importlib.util.module_from_spec(spec) 1067 attr_id = str(id(module.Error)).encode() 1068 1069 os.write({w}, attr_id) 1070 """) 1071 exec(script) 1072 main_attr_id = os.read(r, 100) 1073 1074 ret = support.run_in_subinterp(script) 1075 self.assertEqual(ret, 0) 1076 subinterp_attr_id = os.read(r, 100) 1077 self.assertEqual(main_attr_id, subinterp_attr_id) 1078 1079 1080class TestThreadState(unittest.TestCase): 1081 1082 @threading_helper.reap_threads 1083 @threading_helper.requires_working_threading() 1084 def test_thread_state(self): 1085 # some extra thread-state tests driven via _testcapi 1086 def target(): 1087 idents = [] 1088 1089 def callback(): 1090 idents.append(threading.get_ident()) 1091 1092 _testcapi._test_thread_state(callback) 1093 a = b = callback 1094 time.sleep(1) 1095 # Check our main thread is in the list exactly 3 times. 1096 self.assertEqual(idents.count(threading.get_ident()), 3, 1097 "Couldn't find main thread correctly in the list") 1098 1099 target() 1100 t = threading.Thread(target=target) 1101 t.start() 1102 t.join() 1103 1104 @threading_helper.reap_threads 1105 @threading_helper.requires_working_threading() 1106 def test_gilstate_ensure_no_deadlock(self): 1107 # See https://github.com/python/cpython/issues/96071 1108 code = textwrap.dedent(f""" 1109 import _testcapi 1110 1111 def callback(): 1112 print('callback called') 1113 1114 _testcapi._test_thread_state(callback) 1115 """) 1116 ret = assert_python_ok('-X', 'tracemalloc', '-c', code) 1117 self.assertIn(b'callback called', ret.out) 1118 1119 1120class Test_testcapi(unittest.TestCase): 1121 locals().update((name, getattr(_testcapi, name)) 1122 for name in dir(_testcapi) 1123 if name.startswith('test_') and not name.endswith('_code')) 1124 1125 # Suppress warning from PyUnicode_FromUnicode(). 1126 @warnings_helper.ignore_warnings(category=DeprecationWarning) 1127 def test_widechar(self): 1128 _testcapi.test_widechar() 1129 1130 def test_version_api_data(self): 1131 self.assertEqual(_testcapi.Py_Version, sys.hexversion) 1132 1133 1134class Test_testinternalcapi(unittest.TestCase): 1135 locals().update((name, getattr(_testinternalcapi, name)) 1136 for name in dir(_testinternalcapi) 1137 if name.startswith('test_')) 1138 1139 1140@support.requires_subprocess() 1141class PyMemDebugTests(unittest.TestCase): 1142 PYTHONMALLOC = 'debug' 1143 # '0x04c06e0' or '04C06E0' 1144 PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+' 1145 1146 def check(self, code): 1147 with support.SuppressCrashReport(): 1148 out = assert_python_failure( 1149 '-c', code, 1150 PYTHONMALLOC=self.PYTHONMALLOC, 1151 # FreeBSD: instruct jemalloc to not fill freed() memory 1152 # with junk byte 0x5a, see JEMALLOC(3) 1153 MALLOC_CONF="junk:false", 1154 ) 1155 stderr = out.err 1156 return stderr.decode('ascii', 'replace') 1157 1158 def test_buffer_overflow(self): 1159 out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()') 1160 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 1161 r" 16 bytes originally requested\n" 1162 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 1163 r" The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n" 1164 r" at tail\+0: 0x78 \*\*\* OUCH\n" 1165 r" at tail\+1: 0xfd\n" 1166 r" at tail\+2: 0xfd\n" 1167 r" .*\n" 1168 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?" 1169 r" Data at p: cd cd cd .*\n" 1170 r"\n" 1171 r"Enable tracemalloc to get the memory block allocation traceback\n" 1172 r"\n" 1173 r"Fatal Python error: _PyMem_DebugRawFree: bad trailing pad byte") 1174 regex = regex.format(ptr=self.PTR_REGEX) 1175 regex = re.compile(regex, flags=re.DOTALL) 1176 self.assertRegex(out, regex) 1177 1178 def test_api_misuse(self): 1179 out = self.check('import _testcapi; _testcapi.pymem_api_misuse()') 1180 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 1181 r" 16 bytes originally requested\n" 1182 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 1183 r" The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n" 1184 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?" 1185 r" Data at p: cd cd cd .*\n" 1186 r"\n" 1187 r"Enable tracemalloc to get the memory block allocation traceback\n" 1188 r"\n" 1189 r"Fatal Python error: _PyMem_DebugRawFree: bad ID: Allocated using API 'm', verified using API 'r'\n") 1190 regex = regex.format(ptr=self.PTR_REGEX) 1191 self.assertRegex(out, regex) 1192 1193 def check_malloc_without_gil(self, code): 1194 out = self.check(code) 1195 expected = ('Fatal Python error: _PyMem_DebugMalloc: ' 1196 'Python memory allocator called without holding the GIL') 1197 self.assertIn(expected, out) 1198 1199 def test_pymem_malloc_without_gil(self): 1200 # Debug hooks must raise an error if PyMem_Malloc() is called 1201 # without holding the GIL 1202 code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()' 1203 self.check_malloc_without_gil(code) 1204 1205 def test_pyobject_malloc_without_gil(self): 1206 # Debug hooks must raise an error if PyObject_Malloc() is called 1207 # without holding the GIL 1208 code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()' 1209 self.check_malloc_without_gil(code) 1210 1211 def check_pyobject_is_freed(self, func_name): 1212 code = textwrap.dedent(f''' 1213 import gc, os, sys, _testcapi 1214 # Disable the GC to avoid crash on GC collection 1215 gc.disable() 1216 try: 1217 _testcapi.{func_name}() 1218 # Exit immediately to avoid a crash while deallocating 1219 # the invalid object 1220 os._exit(0) 1221 except _testcapi.error: 1222 os._exit(1) 1223 ''') 1224 assert_python_ok( 1225 '-c', code, 1226 PYTHONMALLOC=self.PYTHONMALLOC, 1227 MALLOC_CONF="junk:false", 1228 ) 1229 1230 def test_pyobject_null_is_freed(self): 1231 self.check_pyobject_is_freed('check_pyobject_null_is_freed') 1232 1233 def test_pyobject_uninitialized_is_freed(self): 1234 self.check_pyobject_is_freed('check_pyobject_uninitialized_is_freed') 1235 1236 def test_pyobject_forbidden_bytes_is_freed(self): 1237 self.check_pyobject_is_freed('check_pyobject_forbidden_bytes_is_freed') 1238 1239 def test_pyobject_freed_is_freed(self): 1240 self.check_pyobject_is_freed('check_pyobject_freed_is_freed') 1241 1242 1243class PyMemMallocDebugTests(PyMemDebugTests): 1244 PYTHONMALLOC = 'malloc_debug' 1245 1246 1247@unittest.skipUnless(support.with_pymalloc(), 'need pymalloc') 1248class PyMemPymallocDebugTests(PyMemDebugTests): 1249 PYTHONMALLOC = 'pymalloc_debug' 1250 1251 1252@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG') 1253class PyMemDefaultTests(PyMemDebugTests): 1254 # test default allocator of Python compiled in debug mode 1255 PYTHONMALLOC = '' 1256 1257 1258@unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module") 1259class Test_ModuleStateAccess(unittest.TestCase): 1260 """Test access to module start (PEP 573)""" 1261 1262 # The C part of the tests lives in _testmultiphase, in a module called 1263 # _testmultiphase_meth_state_access. 1264 # This module has multi-phase initialization, unlike _testcapi. 1265 1266 def setUp(self): 1267 fullname = '_testmultiphase_meth_state_access' # XXX 1268 origin = importlib.util.find_spec('_testmultiphase').origin 1269 loader = importlib.machinery.ExtensionFileLoader(fullname, origin) 1270 spec = importlib.util.spec_from_loader(fullname, loader) 1271 module = importlib.util.module_from_spec(spec) 1272 loader.exec_module(module) 1273 self.module = module 1274 1275 def test_subclass_get_module(self): 1276 """PyType_GetModule for defining_class""" 1277 class StateAccessType_Subclass(self.module.StateAccessType): 1278 pass 1279 1280 instance = StateAccessType_Subclass() 1281 self.assertIs(instance.get_defining_module(), self.module) 1282 1283 def test_subclass_get_module_with_super(self): 1284 class StateAccessType_Subclass(self.module.StateAccessType): 1285 def get_defining_module(self): 1286 return super().get_defining_module() 1287 1288 instance = StateAccessType_Subclass() 1289 self.assertIs(instance.get_defining_module(), self.module) 1290 1291 def test_state_access(self): 1292 """Checks methods defined with and without argument clinic 1293 1294 This tests a no-arg method (get_count) and a method with 1295 both a positional and keyword argument. 1296 """ 1297 1298 a = self.module.StateAccessType() 1299 b = self.module.StateAccessType() 1300 1301 methods = { 1302 'clinic': a.increment_count_clinic, 1303 'noclinic': a.increment_count_noclinic, 1304 } 1305 1306 for name, increment_count in methods.items(): 1307 with self.subTest(name): 1308 self.assertEqual(a.get_count(), b.get_count()) 1309 self.assertEqual(a.get_count(), 0) 1310 1311 increment_count() 1312 self.assertEqual(a.get_count(), b.get_count()) 1313 self.assertEqual(a.get_count(), 1) 1314 1315 increment_count(3) 1316 self.assertEqual(a.get_count(), b.get_count()) 1317 self.assertEqual(a.get_count(), 4) 1318 1319 increment_count(-2, twice=True) 1320 self.assertEqual(a.get_count(), b.get_count()) 1321 self.assertEqual(a.get_count(), 0) 1322 1323 with self.assertRaises(TypeError): 1324 increment_count(thrice=3) 1325 1326 with self.assertRaises(TypeError): 1327 increment_count(1, 2, 3) 1328 1329 def test_get_module_bad_def(self): 1330 # PyType_GetModuleByDef fails gracefully if it doesn't 1331 # find what it's looking for. 1332 # see bpo-46433 1333 instance = self.module.StateAccessType() 1334 with self.assertRaises(TypeError): 1335 instance.getmodulebydef_bad_def() 1336 1337 def test_get_module_static_in_mro(self): 1338 # Here, the class PyType_GetModuleByDef is looking for 1339 # appears in the MRO after a static type (Exception). 1340 # see bpo-46433 1341 class Subclass(BaseException, self.module.StateAccessType): 1342 pass 1343 self.assertIs(Subclass().get_defining_module(), self.module) 1344 1345 1346class Test_FrameAPI(unittest.TestCase): 1347 1348 def getframe(self): 1349 return sys._getframe() 1350 1351 def getgenframe(self): 1352 yield sys._getframe() 1353 1354 def test_frame_getters(self): 1355 frame = self.getframe() 1356 self.assertEqual(frame.f_locals, _testcapi.frame_getlocals(frame)) 1357 self.assertIs(frame.f_globals, _testcapi.frame_getglobals(frame)) 1358 self.assertIs(frame.f_builtins, _testcapi.frame_getbuiltins(frame)) 1359 self.assertEqual(frame.f_lasti, _testcapi.frame_getlasti(frame)) 1360 1361 def test_frame_get_generator(self): 1362 gen = self.getgenframe() 1363 frame = next(gen) 1364 self.assertIs(gen, _testcapi.frame_getgenerator(frame)) 1365 1366 def test_frame_fback_api(self): 1367 """Test that accessing `f_back` does not cause a segmentation fault on 1368 a frame created with `PyFrame_New` (GH-99110).""" 1369 def dummy(): 1370 pass 1371 1372 frame = _testcapi.frame_new(dummy.__code__, globals(), locals()) 1373 # The following line should not cause a segmentation fault. 1374 self.assertIsNone(frame.f_back) 1375 1376 1377SUFFICIENT_TO_DEOPT_AND_SPECIALIZE = 100 1378 1379class Test_Pep523API(unittest.TestCase): 1380 1381 def do_test(self, func, names): 1382 actual_calls = [] 1383 start = SUFFICIENT_TO_DEOPT_AND_SPECIALIZE 1384 count = start + SUFFICIENT_TO_DEOPT_AND_SPECIALIZE 1385 try: 1386 for i in range(count): 1387 if i == start: 1388 _testinternalcapi.set_eval_frame_record(actual_calls) 1389 func() 1390 finally: 1391 _testinternalcapi.set_eval_frame_default() 1392 expected_calls = names * SUFFICIENT_TO_DEOPT_AND_SPECIALIZE 1393 self.assertEqual(len(expected_calls), len(actual_calls)) 1394 for expected, actual in zip(expected_calls, actual_calls, strict=True): 1395 self.assertEqual(expected, actual) 1396 1397 def test_inlined_binary_subscr(self): 1398 class C: 1399 def __getitem__(self, other): 1400 return None 1401 def func(): 1402 C()[42] 1403 names = ["func", "__getitem__"] 1404 self.do_test(func, names) 1405 1406 def test_inlined_call(self): 1407 def inner(x=42): 1408 pass 1409 def func(): 1410 inner() 1411 inner(42) 1412 names = ["func", "inner", "inner"] 1413 self.do_test(func, names) 1414 1415 1416if __name__ == "__main__": 1417 unittest.main() 1418