1""" 2Tests for the threading module. 3""" 4 5import test.support 6from test.support import threading_helper, requires_subprocess 7from test.support import verbose, cpython_only, os_helper 8from test.support.import_helper import import_module 9from test.support.script_helper import assert_python_ok, assert_python_failure 10 11import random 12import sys 13import _thread 14import threading 15import time 16import unittest 17import weakref 18import os 19import subprocess 20import signal 21import textwrap 22import traceback 23 24from unittest import mock 25from test import lock_tests 26from test import support 27 28threading_helper.requires_working_threading(module=True) 29 30# Between fork() and exec(), only async-safe functions are allowed (issues 31# #12316 and #11870), and fork() from a worker thread is known to trigger 32# problems with some operating systems (issue #3863): skip problematic tests 33# on platforms known to behave badly. 34platforms_to_skip = ('netbsd5', 'hp-ux11') 35 36# Is Python built with Py_DEBUG macro defined? 37Py_DEBUG = hasattr(sys, 'gettotalrefcount') 38 39 40def restore_default_excepthook(testcase): 41 testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook) 42 threading.excepthook = threading.__excepthook__ 43 44 45# A trivial mutable counter. 46class Counter(object): 47 def __init__(self): 48 self.value = 0 49 def inc(self): 50 self.value += 1 51 def dec(self): 52 self.value -= 1 53 def get(self): 54 return self.value 55 56class TestThread(threading.Thread): 57 def __init__(self, name, testcase, sema, mutex, nrunning): 58 threading.Thread.__init__(self, name=name) 59 self.testcase = testcase 60 self.sema = sema 61 self.mutex = mutex 62 self.nrunning = nrunning 63 64 def run(self): 65 delay = random.random() / 10000.0 66 if verbose: 67 print('task %s will run for %.1f usec' % 68 (self.name, delay * 1e6)) 69 70 with self.sema: 71 with self.mutex: 72 self.nrunning.inc() 73 if verbose: 74 print(self.nrunning.get(), 'tasks are running') 75 self.testcase.assertLessEqual(self.nrunning.get(), 3) 76 77 time.sleep(delay) 78 if verbose: 79 print('task', self.name, 'done') 80 81 with self.mutex: 82 self.nrunning.dec() 83 self.testcase.assertGreaterEqual(self.nrunning.get(), 0) 84 if verbose: 85 print('%s is finished. %d tasks are running' % 86 (self.name, self.nrunning.get())) 87 88 89class BaseTestCase(unittest.TestCase): 90 def setUp(self): 91 self._threads = threading_helper.threading_setup() 92 93 def tearDown(self): 94 threading_helper.threading_cleanup(*self._threads) 95 test.support.reap_children() 96 97 98class ThreadTests(BaseTestCase): 99 100 @cpython_only 101 def test_name(self): 102 def func(): pass 103 104 thread = threading.Thread(name="myname1") 105 self.assertEqual(thread.name, "myname1") 106 107 # Convert int name to str 108 thread = threading.Thread(name=123) 109 self.assertEqual(thread.name, "123") 110 111 # target name is ignored if name is specified 112 thread = threading.Thread(target=func, name="myname2") 113 self.assertEqual(thread.name, "myname2") 114 115 with mock.patch.object(threading, '_counter', return_value=2): 116 thread = threading.Thread(name="") 117 self.assertEqual(thread.name, "Thread-2") 118 119 with mock.patch.object(threading, '_counter', return_value=3): 120 thread = threading.Thread() 121 self.assertEqual(thread.name, "Thread-3") 122 123 with mock.patch.object(threading, '_counter', return_value=5): 124 thread = threading.Thread(target=func) 125 self.assertEqual(thread.name, "Thread-5 (func)") 126 127 def test_args_argument(self): 128 # bpo-45735: Using list or tuple as *args* in constructor could 129 # achieve the same effect. 130 num_list = [1] 131 num_tuple = (1,) 132 133 str_list = ["str"] 134 str_tuple = ("str",) 135 136 list_in_tuple = ([1],) 137 tuple_in_list = [(1,)] 138 139 test_cases = ( 140 (num_list, lambda arg: self.assertEqual(arg, 1)), 141 (num_tuple, lambda arg: self.assertEqual(arg, 1)), 142 (str_list, lambda arg: self.assertEqual(arg, "str")), 143 (str_tuple, lambda arg: self.assertEqual(arg, "str")), 144 (list_in_tuple, lambda arg: self.assertEqual(arg, [1])), 145 (tuple_in_list, lambda arg: self.assertEqual(arg, (1,))) 146 ) 147 148 for args, target in test_cases: 149 with self.subTest(target=target, args=args): 150 t = threading.Thread(target=target, args=args) 151 t.start() 152 t.join() 153 154 @cpython_only 155 def test_disallow_instantiation(self): 156 # Ensure that the type disallows instantiation (bpo-43916) 157 lock = threading.Lock() 158 test.support.check_disallow_instantiation(self, type(lock)) 159 160 # Create a bunch of threads, let each do some work, wait until all are 161 # done. 162 def test_various_ops(self): 163 # This takes about n/3 seconds to run (about n/3 clumps of tasks, 164 # times about 1 second per clump). 165 NUMTASKS = 10 166 167 # no more than 3 of the 10 can run at once 168 sema = threading.BoundedSemaphore(value=3) 169 mutex = threading.RLock() 170 numrunning = Counter() 171 172 threads = [] 173 174 for i in range(NUMTASKS): 175 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) 176 threads.append(t) 177 self.assertIsNone(t.ident) 178 self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$') 179 t.start() 180 181 if hasattr(threading, 'get_native_id'): 182 native_ids = set(t.native_id for t in threads) | {threading.get_native_id()} 183 self.assertNotIn(None, native_ids) 184 self.assertEqual(len(native_ids), NUMTASKS + 1) 185 186 if verbose: 187 print('waiting for all tasks to complete') 188 for t in threads: 189 t.join() 190 self.assertFalse(t.is_alive()) 191 self.assertNotEqual(t.ident, 0) 192 self.assertIsNotNone(t.ident) 193 self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$') 194 if verbose: 195 print('all tasks done') 196 self.assertEqual(numrunning.get(), 0) 197 198 def test_ident_of_no_threading_threads(self): 199 # The ident still must work for the main thread and dummy threads. 200 self.assertIsNotNone(threading.current_thread().ident) 201 def f(): 202 ident.append(threading.current_thread().ident) 203 done.set() 204 done = threading.Event() 205 ident = [] 206 with threading_helper.wait_threads_exit(): 207 tid = _thread.start_new_thread(f, ()) 208 done.wait() 209 self.assertEqual(ident[0], tid) 210 # Kill the "immortal" _DummyThread 211 del threading._active[ident[0]] 212 213 # run with a small(ish) thread stack size (256 KiB) 214 def test_various_ops_small_stack(self): 215 if verbose: 216 print('with 256 KiB thread stack size...') 217 try: 218 threading.stack_size(262144) 219 except _thread.error: 220 raise unittest.SkipTest( 221 'platform does not support changing thread stack size') 222 self.test_various_ops() 223 threading.stack_size(0) 224 225 # run with a large thread stack size (1 MiB) 226 def test_various_ops_large_stack(self): 227 if verbose: 228 print('with 1 MiB thread stack size...') 229 try: 230 threading.stack_size(0x100000) 231 except _thread.error: 232 raise unittest.SkipTest( 233 'platform does not support changing thread stack size') 234 self.test_various_ops() 235 threading.stack_size(0) 236 237 def test_foreign_thread(self): 238 # Check that a "foreign" thread can use the threading module. 239 def f(mutex): 240 # Calling current_thread() forces an entry for the foreign 241 # thread to get made in the threading._active map. 242 threading.current_thread() 243 mutex.release() 244 245 mutex = threading.Lock() 246 mutex.acquire() 247 with threading_helper.wait_threads_exit(): 248 tid = _thread.start_new_thread(f, (mutex,)) 249 # Wait for the thread to finish. 250 mutex.acquire() 251 self.assertIn(tid, threading._active) 252 self.assertIsInstance(threading._active[tid], threading._DummyThread) 253 #Issue 29376 254 self.assertTrue(threading._active[tid].is_alive()) 255 self.assertRegex(repr(threading._active[tid]), '_DummyThread') 256 del threading._active[tid] 257 258 # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) 259 # exposed at the Python level. This test relies on ctypes to get at it. 260 def test_PyThreadState_SetAsyncExc(self): 261 ctypes = import_module("ctypes") 262 263 set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc 264 set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object) 265 266 class AsyncExc(Exception): 267 pass 268 269 exception = ctypes.py_object(AsyncExc) 270 271 # First check it works when setting the exception from the same thread. 272 tid = threading.get_ident() 273 self.assertIsInstance(tid, int) 274 self.assertGreater(tid, 0) 275 276 try: 277 result = set_async_exc(tid, exception) 278 # The exception is async, so we might have to keep the VM busy until 279 # it notices. 280 while True: 281 pass 282 except AsyncExc: 283 pass 284 else: 285 # This code is unreachable but it reflects the intent. If we wanted 286 # to be smarter the above loop wouldn't be infinite. 287 self.fail("AsyncExc not raised") 288 try: 289 self.assertEqual(result, 1) # one thread state modified 290 except UnboundLocalError: 291 # The exception was raised too quickly for us to get the result. 292 pass 293 294 # `worker_started` is set by the thread when it's inside a try/except 295 # block waiting to catch the asynchronously set AsyncExc exception. 296 # `worker_saw_exception` is set by the thread upon catching that 297 # exception. 298 worker_started = threading.Event() 299 worker_saw_exception = threading.Event() 300 301 class Worker(threading.Thread): 302 def run(self): 303 self.id = threading.get_ident() 304 self.finished = False 305 306 try: 307 while True: 308 worker_started.set() 309 time.sleep(0.1) 310 except AsyncExc: 311 self.finished = True 312 worker_saw_exception.set() 313 314 t = Worker() 315 t.daemon = True # so if this fails, we don't hang Python at shutdown 316 t.start() 317 if verbose: 318 print(" started worker thread") 319 320 # Try a thread id that doesn't make sense. 321 if verbose: 322 print(" trying nonsensical thread id") 323 result = set_async_exc(-1, exception) 324 self.assertEqual(result, 0) # no thread states modified 325 326 # Now raise an exception in the worker thread. 327 if verbose: 328 print(" waiting for worker thread to get started") 329 ret = worker_started.wait() 330 self.assertTrue(ret) 331 if verbose: 332 print(" verifying worker hasn't exited") 333 self.assertFalse(t.finished) 334 if verbose: 335 print(" attempting to raise asynch exception in worker") 336 result = set_async_exc(t.id, exception) 337 self.assertEqual(result, 1) # one thread state modified 338 if verbose: 339 print(" waiting for worker to say it caught the exception") 340 worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT) 341 self.assertTrue(t.finished) 342 if verbose: 343 print(" all OK -- joining worker") 344 if t.finished: 345 t.join() 346 # else the thread is still running, and we have no way to kill it 347 348 def test_limbo_cleanup(self): 349 # Issue 7481: Failure to start thread should cleanup the limbo map. 350 def fail_new_thread(*args): 351 raise threading.ThreadError() 352 _start_new_thread = threading._start_new_thread 353 threading._start_new_thread = fail_new_thread 354 try: 355 t = threading.Thread(target=lambda: None) 356 self.assertRaises(threading.ThreadError, t.start) 357 self.assertFalse( 358 t in threading._limbo, 359 "Failed to cleanup _limbo map on failure of Thread.start().") 360 finally: 361 threading._start_new_thread = _start_new_thread 362 363 def test_finalize_running_thread(self): 364 # Issue 1402: the PyGILState_Ensure / _Release functions may be called 365 # very late on python exit: on deallocation of a running thread for 366 # example. 367 import_module("ctypes") 368 369 rc, out, err = assert_python_failure("-c", """if 1: 370 import ctypes, sys, time, _thread 371 372 # This lock is used as a simple event variable. 373 ready = _thread.allocate_lock() 374 ready.acquire() 375 376 # Module globals are cleared before __del__ is run 377 # So we save the functions in class dict 378 class C: 379 ensure = ctypes.pythonapi.PyGILState_Ensure 380 release = ctypes.pythonapi.PyGILState_Release 381 def __del__(self): 382 state = self.ensure() 383 self.release(state) 384 385 def waitingThread(): 386 x = C() 387 ready.release() 388 time.sleep(100) 389 390 _thread.start_new_thread(waitingThread, ()) 391 ready.acquire() # Be sure the other thread is waiting. 392 sys.exit(42) 393 """) 394 self.assertEqual(rc, 42) 395 396 def test_finalize_with_trace(self): 397 # Issue1733757 398 # Avoid a deadlock when sys.settrace steps into threading._shutdown 399 assert_python_ok("-c", """if 1: 400 import sys, threading 401 402 # A deadlock-killer, to prevent the 403 # testsuite to hang forever 404 def killer(): 405 import os, time 406 time.sleep(2) 407 print('program blocked; aborting') 408 os._exit(2) 409 t = threading.Thread(target=killer) 410 t.daemon = True 411 t.start() 412 413 # This is the trace function 414 def func(frame, event, arg): 415 threading.current_thread() 416 return func 417 418 sys.settrace(func) 419 """) 420 421 def test_join_nondaemon_on_shutdown(self): 422 # Issue 1722344 423 # Raising SystemExit skipped threading._shutdown 424 rc, out, err = assert_python_ok("-c", """if 1: 425 import threading 426 from time import sleep 427 428 def child(): 429 sleep(1) 430 # As a non-daemon thread we SHOULD wake up and nothing 431 # should be torn down yet 432 print("Woke up, sleep function is:", sleep) 433 434 threading.Thread(target=child).start() 435 raise SystemExit 436 """) 437 self.assertEqual(out.strip(), 438 b"Woke up, sleep function is: <built-in function sleep>") 439 self.assertEqual(err, b"") 440 441 def test_enumerate_after_join(self): 442 # Try hard to trigger #1703448: a thread is still returned in 443 # threading.enumerate() after it has been join()ed. 444 enum = threading.enumerate 445 old_interval = sys.getswitchinterval() 446 try: 447 for i in range(1, 100): 448 sys.setswitchinterval(i * 0.0002) 449 t = threading.Thread(target=lambda: None) 450 t.start() 451 t.join() 452 l = enum() 453 self.assertNotIn(t, l, 454 "#1703448 triggered after %d trials: %s" % (i, l)) 455 finally: 456 sys.setswitchinterval(old_interval) 457 458 def test_no_refcycle_through_target(self): 459 class RunSelfFunction(object): 460 def __init__(self, should_raise): 461 # The links in this refcycle from Thread back to self 462 # should be cleaned up when the thread completes. 463 self.should_raise = should_raise 464 self.thread = threading.Thread(target=self._run, 465 args=(self,), 466 kwargs={'yet_another':self}) 467 self.thread.start() 468 469 def _run(self, other_ref, yet_another): 470 if self.should_raise: 471 raise SystemExit 472 473 restore_default_excepthook(self) 474 475 cyclic_object = RunSelfFunction(should_raise=False) 476 weak_cyclic_object = weakref.ref(cyclic_object) 477 cyclic_object.thread.join() 478 del cyclic_object 479 self.assertIsNone(weak_cyclic_object(), 480 msg=('%d references still around' % 481 sys.getrefcount(weak_cyclic_object()))) 482 483 raising_cyclic_object = RunSelfFunction(should_raise=True) 484 weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) 485 raising_cyclic_object.thread.join() 486 del raising_cyclic_object 487 self.assertIsNone(weak_raising_cyclic_object(), 488 msg=('%d references still around' % 489 sys.getrefcount(weak_raising_cyclic_object()))) 490 491 def test_old_threading_api(self): 492 # Just a quick sanity check to make sure the old method names are 493 # still present 494 t = threading.Thread() 495 with self.assertWarnsRegex(DeprecationWarning, 496 r'get the daemon attribute'): 497 t.isDaemon() 498 with self.assertWarnsRegex(DeprecationWarning, 499 r'set the daemon attribute'): 500 t.setDaemon(True) 501 with self.assertWarnsRegex(DeprecationWarning, 502 r'get the name attribute'): 503 t.getName() 504 with self.assertWarnsRegex(DeprecationWarning, 505 r'set the name attribute'): 506 t.setName("name") 507 508 e = threading.Event() 509 with self.assertWarnsRegex(DeprecationWarning, 'use is_set()'): 510 e.isSet() 511 512 cond = threading.Condition() 513 cond.acquire() 514 with self.assertWarnsRegex(DeprecationWarning, 'use notify_all()'): 515 cond.notifyAll() 516 517 with self.assertWarnsRegex(DeprecationWarning, 'use active_count()'): 518 threading.activeCount() 519 with self.assertWarnsRegex(DeprecationWarning, 'use current_thread()'): 520 threading.currentThread() 521 522 def test_repr_daemon(self): 523 t = threading.Thread() 524 self.assertNotIn('daemon', repr(t)) 525 t.daemon = True 526 self.assertIn('daemon', repr(t)) 527 528 def test_daemon_param(self): 529 t = threading.Thread() 530 self.assertFalse(t.daemon) 531 t = threading.Thread(daemon=False) 532 self.assertFalse(t.daemon) 533 t = threading.Thread(daemon=True) 534 self.assertTrue(t.daemon) 535 536 @support.requires_fork() 537 def test_fork_at_exit(self): 538 # bpo-42350: Calling os.fork() after threading._shutdown() must 539 # not log an error. 540 code = textwrap.dedent(""" 541 import atexit 542 import os 543 import sys 544 from test.support import wait_process 545 546 # Import the threading module to register its "at fork" callback 547 import threading 548 549 def exit_handler(): 550 pid = os.fork() 551 if not pid: 552 print("child process ok", file=sys.stderr, flush=True) 553 # child process 554 else: 555 wait_process(pid, exitcode=0) 556 557 # exit_handler() will be called after threading._shutdown() 558 atexit.register(exit_handler) 559 """) 560 _, out, err = assert_python_ok("-c", code) 561 self.assertEqual(out, b'') 562 self.assertEqual(err.rstrip(), b'child process ok') 563 564 @support.requires_fork() 565 def test_dummy_thread_after_fork(self): 566 # Issue #14308: a dummy thread in the active list doesn't mess up 567 # the after-fork mechanism. 568 code = """if 1: 569 import _thread, threading, os, time 570 571 def background_thread(evt): 572 # Creates and registers the _DummyThread instance 573 threading.current_thread() 574 evt.set() 575 time.sleep(10) 576 577 evt = threading.Event() 578 _thread.start_new_thread(background_thread, (evt,)) 579 evt.wait() 580 assert threading.active_count() == 2, threading.active_count() 581 if os.fork() == 0: 582 assert threading.active_count() == 1, threading.active_count() 583 os._exit(0) 584 else: 585 os.wait() 586 """ 587 _, out, err = assert_python_ok("-c", code) 588 self.assertEqual(out, b'') 589 self.assertEqual(err, b'') 590 591 @support.requires_fork() 592 def test_is_alive_after_fork(self): 593 # Try hard to trigger #18418: is_alive() could sometimes be True on 594 # threads that vanished after a fork. 595 old_interval = sys.getswitchinterval() 596 self.addCleanup(sys.setswitchinterval, old_interval) 597 598 # Make the bug more likely to manifest. 599 test.support.setswitchinterval(1e-6) 600 601 for i in range(20): 602 t = threading.Thread(target=lambda: None) 603 t.start() 604 pid = os.fork() 605 if pid == 0: 606 os._exit(11 if t.is_alive() else 10) 607 else: 608 t.join() 609 610 support.wait_process(pid, exitcode=10) 611 612 def test_main_thread(self): 613 main = threading.main_thread() 614 self.assertEqual(main.name, 'MainThread') 615 self.assertEqual(main.ident, threading.current_thread().ident) 616 self.assertEqual(main.ident, threading.get_ident()) 617 618 def f(): 619 self.assertNotEqual(threading.main_thread().ident, 620 threading.current_thread().ident) 621 th = threading.Thread(target=f) 622 th.start() 623 th.join() 624 625 @support.requires_fork() 626 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 627 def test_main_thread_after_fork(self): 628 code = """if 1: 629 import os, threading 630 from test import support 631 632 pid = os.fork() 633 if pid == 0: 634 main = threading.main_thread() 635 print(main.name) 636 print(main.ident == threading.current_thread().ident) 637 print(main.ident == threading.get_ident()) 638 else: 639 support.wait_process(pid, exitcode=0) 640 """ 641 _, out, err = assert_python_ok("-c", code) 642 data = out.decode().replace('\r', '') 643 self.assertEqual(err, b"") 644 self.assertEqual(data, "MainThread\nTrue\nTrue\n") 645 646 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 647 @support.requires_fork() 648 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 649 def test_main_thread_after_fork_from_nonmain_thread(self): 650 code = """if 1: 651 import os, threading, sys 652 from test import support 653 654 def func(): 655 pid = os.fork() 656 if pid == 0: 657 main = threading.main_thread() 658 print(main.name) 659 print(main.ident == threading.current_thread().ident) 660 print(main.ident == threading.get_ident()) 661 # stdout is fully buffered because not a tty, 662 # we have to flush before exit. 663 sys.stdout.flush() 664 else: 665 support.wait_process(pid, exitcode=0) 666 667 th = threading.Thread(target=func) 668 th.start() 669 th.join() 670 """ 671 _, out, err = assert_python_ok("-c", code) 672 data = out.decode().replace('\r', '') 673 self.assertEqual(err, b"") 674 self.assertEqual(data, "Thread-1 (func)\nTrue\nTrue\n") 675 676 def test_main_thread_during_shutdown(self): 677 # bpo-31516: current_thread() should still point to the main thread 678 # at shutdown 679 code = """if 1: 680 import gc, threading 681 682 main_thread = threading.current_thread() 683 assert main_thread is threading.main_thread() # sanity check 684 685 class RefCycle: 686 def __init__(self): 687 self.cycle = self 688 689 def __del__(self): 690 print("GC:", 691 threading.current_thread() is main_thread, 692 threading.main_thread() is main_thread, 693 threading.enumerate() == [main_thread]) 694 695 RefCycle() 696 gc.collect() # sanity check 697 x = RefCycle() 698 """ 699 _, out, err = assert_python_ok("-c", code) 700 data = out.decode() 701 self.assertEqual(err, b"") 702 self.assertEqual(data.splitlines(), 703 ["GC: True True True"] * 2) 704 705 def test_finalization_shutdown(self): 706 # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait 707 # until Python thread states of all non-daemon threads get deleted. 708 # 709 # Test similar to SubinterpThreadingTests.test_threads_join_2(), but 710 # test the finalization of the main interpreter. 711 code = """if 1: 712 import os 713 import threading 714 import time 715 import random 716 717 def random_sleep(): 718 seconds = random.random() * 0.010 719 time.sleep(seconds) 720 721 class Sleeper: 722 def __del__(self): 723 random_sleep() 724 725 tls = threading.local() 726 727 def f(): 728 # Sleep a bit so that the thread is still running when 729 # Py_Finalize() is called. 730 random_sleep() 731 tls.x = Sleeper() 732 random_sleep() 733 734 threading.Thread(target=f).start() 735 random_sleep() 736 """ 737 rc, out, err = assert_python_ok("-c", code) 738 self.assertEqual(err, b"") 739 740 def test_tstate_lock(self): 741 # Test an implementation detail of Thread objects. 742 started = _thread.allocate_lock() 743 finish = _thread.allocate_lock() 744 started.acquire() 745 finish.acquire() 746 def f(): 747 started.release() 748 finish.acquire() 749 time.sleep(0.01) 750 # The tstate lock is None until the thread is started 751 t = threading.Thread(target=f) 752 self.assertIs(t._tstate_lock, None) 753 t.start() 754 started.acquire() 755 self.assertTrue(t.is_alive()) 756 # The tstate lock can't be acquired when the thread is running 757 # (or suspended). 758 tstate_lock = t._tstate_lock 759 self.assertFalse(tstate_lock.acquire(timeout=0), False) 760 finish.release() 761 # When the thread ends, the state_lock can be successfully 762 # acquired. 763 self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False) 764 # But is_alive() is still True: we hold _tstate_lock now, which 765 # prevents is_alive() from knowing the thread's end-of-life C code 766 # is done. 767 self.assertTrue(t.is_alive()) 768 # Let is_alive() find out the C code is done. 769 tstate_lock.release() 770 self.assertFalse(t.is_alive()) 771 # And verify the thread disposed of _tstate_lock. 772 self.assertIsNone(t._tstate_lock) 773 t.join() 774 775 def test_repr_stopped(self): 776 # Verify that "stopped" shows up in repr(Thread) appropriately. 777 started = _thread.allocate_lock() 778 finish = _thread.allocate_lock() 779 started.acquire() 780 finish.acquire() 781 def f(): 782 started.release() 783 finish.acquire() 784 t = threading.Thread(target=f) 785 t.start() 786 started.acquire() 787 self.assertIn("started", repr(t)) 788 finish.release() 789 # "stopped" should appear in the repr in a reasonable amount of time. 790 # Implementation detail: as of this writing, that's trivially true 791 # if .join() is called, and almost trivially true if .is_alive() is 792 # called. The detail we're testing here is that "stopped" shows up 793 # "all on its own". 794 LOOKING_FOR = "stopped" 795 for i in range(500): 796 if LOOKING_FOR in repr(t): 797 break 798 time.sleep(0.01) 799 self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds 800 t.join() 801 802 def test_BoundedSemaphore_limit(self): 803 # BoundedSemaphore should raise ValueError if released too often. 804 for limit in range(1, 10): 805 bs = threading.BoundedSemaphore(limit) 806 threads = [threading.Thread(target=bs.acquire) 807 for _ in range(limit)] 808 for t in threads: 809 t.start() 810 for t in threads: 811 t.join() 812 threads = [threading.Thread(target=bs.release) 813 for _ in range(limit)] 814 for t in threads: 815 t.start() 816 for t in threads: 817 t.join() 818 self.assertRaises(ValueError, bs.release) 819 820 @cpython_only 821 def test_frame_tstate_tracing(self): 822 # Issue #14432: Crash when a generator is created in a C thread that is 823 # destroyed while the generator is still used. The issue was that a 824 # generator contains a frame, and the frame kept a reference to the 825 # Python state of the destroyed C thread. The crash occurs when a trace 826 # function is setup. 827 828 def noop_trace(frame, event, arg): 829 # no operation 830 return noop_trace 831 832 def generator(): 833 while 1: 834 yield "generator" 835 836 def callback(): 837 if callback.gen is None: 838 callback.gen = generator() 839 return next(callback.gen) 840 callback.gen = None 841 842 old_trace = sys.gettrace() 843 sys.settrace(noop_trace) 844 try: 845 # Install a trace function 846 threading.settrace(noop_trace) 847 848 # Create a generator in a C thread which exits after the call 849 import _testcapi 850 _testcapi.call_in_temporary_c_thread(callback) 851 852 # Call the generator in a different Python thread, check that the 853 # generator didn't keep a reference to the destroyed thread state 854 for test in range(3): 855 # The trace function is still called here 856 callback() 857 finally: 858 sys.settrace(old_trace) 859 860 def test_gettrace(self): 861 def noop_trace(frame, event, arg): 862 # no operation 863 return noop_trace 864 old_trace = threading.gettrace() 865 try: 866 threading.settrace(noop_trace) 867 trace_func = threading.gettrace() 868 self.assertEqual(noop_trace,trace_func) 869 finally: 870 threading.settrace(old_trace) 871 872 def test_getprofile(self): 873 def fn(*args): pass 874 old_profile = threading.getprofile() 875 try: 876 threading.setprofile(fn) 877 self.assertEqual(fn, threading.getprofile()) 878 finally: 879 threading.setprofile(old_profile) 880 881 @cpython_only 882 def test_shutdown_locks(self): 883 for daemon in (False, True): 884 with self.subTest(daemon=daemon): 885 event = threading.Event() 886 thread = threading.Thread(target=event.wait, daemon=daemon) 887 888 # Thread.start() must add lock to _shutdown_locks, 889 # but only for non-daemon thread 890 thread.start() 891 tstate_lock = thread._tstate_lock 892 if not daemon: 893 self.assertIn(tstate_lock, threading._shutdown_locks) 894 else: 895 self.assertNotIn(tstate_lock, threading._shutdown_locks) 896 897 # unblock the thread and join it 898 event.set() 899 thread.join() 900 901 # Thread._stop() must remove tstate_lock from _shutdown_locks. 902 # Daemon threads must never add it to _shutdown_locks. 903 self.assertNotIn(tstate_lock, threading._shutdown_locks) 904 905 def test_locals_at_exit(self): 906 # bpo-19466: thread locals must not be deleted before destructors 907 # are called 908 rc, out, err = assert_python_ok("-c", """if 1: 909 import threading 910 911 class Atexit: 912 def __del__(self): 913 print("thread_dict.atexit = %r" % thread_dict.atexit) 914 915 thread_dict = threading.local() 916 thread_dict.atexit = "value" 917 918 atexit = Atexit() 919 """) 920 self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'") 921 922 def test_boolean_target(self): 923 # bpo-41149: A thread that had a boolean value of False would not 924 # run, regardless of whether it was callable. The correct behaviour 925 # is for a thread to do nothing if its target is None, and to call 926 # the target otherwise. 927 class BooleanTarget(object): 928 def __init__(self): 929 self.ran = False 930 def __bool__(self): 931 return False 932 def __call__(self): 933 self.ran = True 934 935 target = BooleanTarget() 936 thread = threading.Thread(target=target) 937 thread.start() 938 thread.join() 939 self.assertTrue(target.ran) 940 941 def test_leak_without_join(self): 942 # bpo-37788: Test that a thread which is not joined explicitly 943 # does not leak. Test written for reference leak checks. 944 def noop(): pass 945 with threading_helper.wait_threads_exit(): 946 threading.Thread(target=noop).start() 947 # Thread.join() is not called 948 949 @unittest.skipUnless(Py_DEBUG, 'need debug build (Py_DEBUG)') 950 def test_debug_deprecation(self): 951 # bpo-44584: The PYTHONTHREADDEBUG environment variable is deprecated 952 rc, out, err = assert_python_ok("-Wdefault", "-c", "pass", 953 PYTHONTHREADDEBUG="1") 954 msg = (b'DeprecationWarning: The threading debug ' 955 b'(PYTHONTHREADDEBUG environment variable) ' 956 b'is deprecated and will be removed in Python 3.12') 957 self.assertIn(msg, err) 958 959 def test_import_from_another_thread(self): 960 # bpo-1596321: If the threading module is first import from a thread 961 # different than the main thread, threading._shutdown() must handle 962 # this case without logging an error at Python exit. 963 code = textwrap.dedent(''' 964 import _thread 965 import sys 966 967 event = _thread.allocate_lock() 968 event.acquire() 969 970 def import_threading(): 971 import threading 972 event.release() 973 974 if 'threading' in sys.modules: 975 raise Exception('threading is already imported') 976 977 _thread.start_new_thread(import_threading, ()) 978 979 # wait until the threading module is imported 980 event.acquire() 981 event.release() 982 983 if 'threading' not in sys.modules: 984 raise Exception('threading is not imported') 985 986 # don't wait until the thread completes 987 ''') 988 rc, out, err = assert_python_ok("-c", code) 989 self.assertEqual(out, b'') 990 self.assertEqual(err, b'') 991 992 993class ThreadJoinOnShutdown(BaseTestCase): 994 995 def _run_and_join(self, script): 996 script = """if 1: 997 import sys, os, time, threading 998 999 # a thread, which waits for the main program to terminate 1000 def joiningfunc(mainthread): 1001 mainthread.join() 1002 print('end of thread') 1003 # stdout is fully buffered because not a tty, we have to flush 1004 # before exit. 1005 sys.stdout.flush() 1006 \n""" + script 1007 1008 rc, out, err = assert_python_ok("-c", script) 1009 data = out.decode().replace('\r', '') 1010 self.assertEqual(data, "end of main\nend of thread\n") 1011 1012 def test_1_join_on_shutdown(self): 1013 # The usual case: on exit, wait for a non-daemon thread 1014 script = """if 1: 1015 import os 1016 t = threading.Thread(target=joiningfunc, 1017 args=(threading.current_thread(),)) 1018 t.start() 1019 time.sleep(0.1) 1020 print('end of main') 1021 """ 1022 self._run_and_join(script) 1023 1024 @support.requires_fork() 1025 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 1026 def test_2_join_in_forked_process(self): 1027 # Like the test above, but from a forked interpreter 1028 script = """if 1: 1029 from test import support 1030 1031 childpid = os.fork() 1032 if childpid != 0: 1033 # parent process 1034 support.wait_process(childpid, exitcode=0) 1035 sys.exit(0) 1036 1037 # child process 1038 t = threading.Thread(target=joiningfunc, 1039 args=(threading.current_thread(),)) 1040 t.start() 1041 print('end of main') 1042 """ 1043 self._run_and_join(script) 1044 1045 @support.requires_fork() 1046 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 1047 def test_3_join_in_forked_from_thread(self): 1048 # Like the test above, but fork() was called from a worker thread 1049 # In the forked process, the main Thread object must be marked as stopped. 1050 1051 script = """if 1: 1052 from test import support 1053 1054 main_thread = threading.current_thread() 1055 def worker(): 1056 childpid = os.fork() 1057 if childpid != 0: 1058 # parent process 1059 support.wait_process(childpid, exitcode=0) 1060 sys.exit(0) 1061 1062 # child process 1063 t = threading.Thread(target=joiningfunc, 1064 args=(main_thread,)) 1065 print('end of main') 1066 t.start() 1067 t.join() # Should not block: main_thread is already stopped 1068 1069 w = threading.Thread(target=worker) 1070 w.start() 1071 """ 1072 self._run_and_join(script) 1073 1074 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 1075 def test_4_daemon_threads(self): 1076 # Check that a daemon thread cannot crash the interpreter on shutdown 1077 # by manipulating internal structures that are being disposed of in 1078 # the main thread. 1079 script = """if True: 1080 import os 1081 import random 1082 import sys 1083 import time 1084 import threading 1085 1086 thread_has_run = set() 1087 1088 def random_io(): 1089 '''Loop for a while sleeping random tiny amounts and doing some I/O.''' 1090 import test.test_threading as mod 1091 while True: 1092 with open(mod.__file__, 'rb') as in_f: 1093 stuff = in_f.read(200) 1094 with open(os.devnull, 'wb') as null_f: 1095 null_f.write(stuff) 1096 time.sleep(random.random() / 1995) 1097 thread_has_run.add(threading.current_thread()) 1098 1099 def main(): 1100 count = 0 1101 for _ in range(40): 1102 new_thread = threading.Thread(target=random_io) 1103 new_thread.daemon = True 1104 new_thread.start() 1105 count += 1 1106 while len(thread_has_run) < count: 1107 time.sleep(0.001) 1108 # Trigger process shutdown 1109 sys.exit(0) 1110 1111 main() 1112 """ 1113 rc, out, err = assert_python_ok('-c', script) 1114 self.assertFalse(err) 1115 1116 @support.requires_fork() 1117 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 1118 def test_reinit_tls_after_fork(self): 1119 # Issue #13817: fork() would deadlock in a multithreaded program with 1120 # the ad-hoc TLS implementation. 1121 1122 def do_fork_and_wait(): 1123 # just fork a child process and wait it 1124 pid = os.fork() 1125 if pid > 0: 1126 support.wait_process(pid, exitcode=50) 1127 else: 1128 os._exit(50) 1129 1130 # start a bunch of threads that will fork() child processes 1131 threads = [] 1132 for i in range(16): 1133 t = threading.Thread(target=do_fork_and_wait) 1134 threads.append(t) 1135 t.start() 1136 1137 for t in threads: 1138 t.join() 1139 1140 @support.requires_fork() 1141 def test_clear_threads_states_after_fork(self): 1142 # Issue #17094: check that threads states are cleared after fork() 1143 1144 # start a bunch of threads 1145 threads = [] 1146 for i in range(16): 1147 t = threading.Thread(target=lambda : time.sleep(0.3)) 1148 threads.append(t) 1149 t.start() 1150 1151 pid = os.fork() 1152 if pid == 0: 1153 # check that threads states have been cleared 1154 if len(sys._current_frames()) == 1: 1155 os._exit(51) 1156 else: 1157 os._exit(52) 1158 else: 1159 support.wait_process(pid, exitcode=51) 1160 1161 for t in threads: 1162 t.join() 1163 1164 1165class SubinterpThreadingTests(BaseTestCase): 1166 def pipe(self): 1167 r, w = os.pipe() 1168 self.addCleanup(os.close, r) 1169 self.addCleanup(os.close, w) 1170 if hasattr(os, 'set_blocking'): 1171 os.set_blocking(r, False) 1172 return (r, w) 1173 1174 def test_threads_join(self): 1175 # Non-daemon threads should be joined at subinterpreter shutdown 1176 # (issue #18808) 1177 r, w = self.pipe() 1178 code = textwrap.dedent(r""" 1179 import os 1180 import random 1181 import threading 1182 import time 1183 1184 def random_sleep(): 1185 seconds = random.random() * 0.010 1186 time.sleep(seconds) 1187 1188 def f(): 1189 # Sleep a bit so that the thread is still running when 1190 # Py_EndInterpreter is called. 1191 random_sleep() 1192 os.write(%d, b"x") 1193 1194 threading.Thread(target=f).start() 1195 random_sleep() 1196 """ % (w,)) 1197 ret = test.support.run_in_subinterp(code) 1198 self.assertEqual(ret, 0) 1199 # The thread was joined properly. 1200 self.assertEqual(os.read(r, 1), b"x") 1201 1202 def test_threads_join_2(self): 1203 # Same as above, but a delay gets introduced after the thread's 1204 # Python code returned but before the thread state is deleted. 1205 # To achieve this, we register a thread-local object which sleeps 1206 # a bit when deallocated. 1207 r, w = self.pipe() 1208 code = textwrap.dedent(r""" 1209 import os 1210 import random 1211 import threading 1212 import time 1213 1214 def random_sleep(): 1215 seconds = random.random() * 0.010 1216 time.sleep(seconds) 1217 1218 class Sleeper: 1219 def __del__(self): 1220 random_sleep() 1221 1222 tls = threading.local() 1223 1224 def f(): 1225 # Sleep a bit so that the thread is still running when 1226 # Py_EndInterpreter is called. 1227 random_sleep() 1228 tls.x = Sleeper() 1229 os.write(%d, b"x") 1230 1231 threading.Thread(target=f).start() 1232 random_sleep() 1233 """ % (w,)) 1234 ret = test.support.run_in_subinterp(code) 1235 self.assertEqual(ret, 0) 1236 # The thread was joined properly. 1237 self.assertEqual(os.read(r, 1), b"x") 1238 1239 @cpython_only 1240 def test_daemon_threads_fatal_error(self): 1241 subinterp_code = f"""if 1: 1242 import os 1243 import threading 1244 import time 1245 1246 def f(): 1247 # Make sure the daemon thread is still running when 1248 # Py_EndInterpreter is called. 1249 time.sleep({test.support.SHORT_TIMEOUT}) 1250 threading.Thread(target=f, daemon=True).start() 1251 """ 1252 script = r"""if 1: 1253 import _testcapi 1254 1255 _testcapi.run_in_subinterp(%r) 1256 """ % (subinterp_code,) 1257 with test.support.SuppressCrashReport(): 1258 rc, out, err = assert_python_failure("-c", script) 1259 self.assertIn("Fatal Python error: Py_EndInterpreter: " 1260 "not the last thread", err.decode()) 1261 1262 1263class ThreadingExceptionTests(BaseTestCase): 1264 # A RuntimeError should be raised if Thread.start() is called 1265 # multiple times. 1266 def test_start_thread_again(self): 1267 thread = threading.Thread() 1268 thread.start() 1269 self.assertRaises(RuntimeError, thread.start) 1270 thread.join() 1271 1272 def test_joining_current_thread(self): 1273 current_thread = threading.current_thread() 1274 self.assertRaises(RuntimeError, current_thread.join); 1275 1276 def test_joining_inactive_thread(self): 1277 thread = threading.Thread() 1278 self.assertRaises(RuntimeError, thread.join) 1279 1280 def test_daemonize_active_thread(self): 1281 thread = threading.Thread() 1282 thread.start() 1283 self.assertRaises(RuntimeError, setattr, thread, "daemon", True) 1284 thread.join() 1285 1286 def test_releasing_unacquired_lock(self): 1287 lock = threading.Lock() 1288 self.assertRaises(RuntimeError, lock.release) 1289 1290 @requires_subprocess() 1291 def test_recursion_limit(self): 1292 # Issue 9670 1293 # test that excessive recursion within a non-main thread causes 1294 # an exception rather than crashing the interpreter on platforms 1295 # like Mac OS X or FreeBSD which have small default stack sizes 1296 # for threads 1297 script = """if True: 1298 import threading 1299 1300 def recurse(): 1301 return recurse() 1302 1303 def outer(): 1304 try: 1305 recurse() 1306 except RecursionError: 1307 pass 1308 1309 w = threading.Thread(target=outer) 1310 w.start() 1311 w.join() 1312 print('end of main thread') 1313 """ 1314 expected_output = "end of main thread\n" 1315 p = subprocess.Popen([sys.executable, "-c", script], 1316 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1317 stdout, stderr = p.communicate() 1318 data = stdout.decode().replace('\r', '') 1319 self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode()) 1320 self.assertEqual(data, expected_output) 1321 1322 def test_print_exception(self): 1323 script = r"""if True: 1324 import threading 1325 import time 1326 1327 running = False 1328 def run(): 1329 global running 1330 running = True 1331 while running: 1332 time.sleep(0.01) 1333 1/0 1334 t = threading.Thread(target=run) 1335 t.start() 1336 while not running: 1337 time.sleep(0.01) 1338 running = False 1339 t.join() 1340 """ 1341 rc, out, err = assert_python_ok("-c", script) 1342 self.assertEqual(out, b'') 1343 err = err.decode() 1344 self.assertIn("Exception in thread", err) 1345 self.assertIn("Traceback (most recent call last):", err) 1346 self.assertIn("ZeroDivisionError", err) 1347 self.assertNotIn("Unhandled exception", err) 1348 1349 def test_print_exception_stderr_is_none_1(self): 1350 script = r"""if True: 1351 import sys 1352 import threading 1353 import time 1354 1355 running = False 1356 def run(): 1357 global running 1358 running = True 1359 while running: 1360 time.sleep(0.01) 1361 1/0 1362 t = threading.Thread(target=run) 1363 t.start() 1364 while not running: 1365 time.sleep(0.01) 1366 sys.stderr = None 1367 running = False 1368 t.join() 1369 """ 1370 rc, out, err = assert_python_ok("-c", script) 1371 self.assertEqual(out, b'') 1372 err = err.decode() 1373 self.assertIn("Exception in thread", err) 1374 self.assertIn("Traceback (most recent call last):", err) 1375 self.assertIn("ZeroDivisionError", err) 1376 self.assertNotIn("Unhandled exception", err) 1377 1378 def test_print_exception_stderr_is_none_2(self): 1379 script = r"""if True: 1380 import sys 1381 import threading 1382 import time 1383 1384 running = False 1385 def run(): 1386 global running 1387 running = True 1388 while running: 1389 time.sleep(0.01) 1390 1/0 1391 sys.stderr = None 1392 t = threading.Thread(target=run) 1393 t.start() 1394 while not running: 1395 time.sleep(0.01) 1396 running = False 1397 t.join() 1398 """ 1399 rc, out, err = assert_python_ok("-c", script) 1400 self.assertEqual(out, b'') 1401 self.assertNotIn("Unhandled exception", err.decode()) 1402 1403 def test_bare_raise_in_brand_new_thread(self): 1404 def bare_raise(): 1405 raise 1406 1407 class Issue27558(threading.Thread): 1408 exc = None 1409 1410 def run(self): 1411 try: 1412 bare_raise() 1413 except Exception as exc: 1414 self.exc = exc 1415 1416 thread = Issue27558() 1417 thread.start() 1418 thread.join() 1419 self.assertIsNotNone(thread.exc) 1420 self.assertIsInstance(thread.exc, RuntimeError) 1421 # explicitly break the reference cycle to not leak a dangling thread 1422 thread.exc = None 1423 1424 def test_multithread_modify_file_noerror(self): 1425 # See issue25872 1426 def modify_file(): 1427 with open(os_helper.TESTFN, 'w', encoding='utf-8') as fp: 1428 fp.write(' ') 1429 traceback.format_stack() 1430 1431 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1432 threads = [ 1433 threading.Thread(target=modify_file) 1434 for i in range(100) 1435 ] 1436 for t in threads: 1437 t.start() 1438 t.join() 1439 1440 1441class ThreadRunFail(threading.Thread): 1442 def run(self): 1443 raise ValueError("run failed") 1444 1445 1446class ExceptHookTests(BaseTestCase): 1447 def setUp(self): 1448 restore_default_excepthook(self) 1449 super().setUp() 1450 1451 def test_excepthook(self): 1452 with support.captured_output("stderr") as stderr: 1453 thread = ThreadRunFail(name="excepthook thread") 1454 thread.start() 1455 thread.join() 1456 1457 stderr = stderr.getvalue().strip() 1458 self.assertIn(f'Exception in thread {thread.name}:\n', stderr) 1459 self.assertIn('Traceback (most recent call last):\n', stderr) 1460 self.assertIn(' raise ValueError("run failed")', stderr) 1461 self.assertIn('ValueError: run failed', stderr) 1462 1463 @support.cpython_only 1464 def test_excepthook_thread_None(self): 1465 # threading.excepthook called with thread=None: log the thread 1466 # identifier in this case. 1467 with support.captured_output("stderr") as stderr: 1468 try: 1469 raise ValueError("bug") 1470 except Exception as exc: 1471 args = threading.ExceptHookArgs([*sys.exc_info(), None]) 1472 try: 1473 threading.excepthook(args) 1474 finally: 1475 # Explicitly break a reference cycle 1476 args = None 1477 1478 stderr = stderr.getvalue().strip() 1479 self.assertIn(f'Exception in thread {threading.get_ident()}:\n', stderr) 1480 self.assertIn('Traceback (most recent call last):\n', stderr) 1481 self.assertIn(' raise ValueError("bug")', stderr) 1482 self.assertIn('ValueError: bug', stderr) 1483 1484 def test_system_exit(self): 1485 class ThreadExit(threading.Thread): 1486 def run(self): 1487 sys.exit(1) 1488 1489 # threading.excepthook() silently ignores SystemExit 1490 with support.captured_output("stderr") as stderr: 1491 thread = ThreadExit() 1492 thread.start() 1493 thread.join() 1494 1495 self.assertEqual(stderr.getvalue(), '') 1496 1497 def test_custom_excepthook(self): 1498 args = None 1499 1500 def hook(hook_args): 1501 nonlocal args 1502 args = hook_args 1503 1504 try: 1505 with support.swap_attr(threading, 'excepthook', hook): 1506 thread = ThreadRunFail() 1507 thread.start() 1508 thread.join() 1509 1510 self.assertEqual(args.exc_type, ValueError) 1511 self.assertEqual(str(args.exc_value), 'run failed') 1512 self.assertEqual(args.exc_traceback, args.exc_value.__traceback__) 1513 self.assertIs(args.thread, thread) 1514 finally: 1515 # Break reference cycle 1516 args = None 1517 1518 def test_custom_excepthook_fail(self): 1519 def threading_hook(args): 1520 raise ValueError("threading_hook failed") 1521 1522 err_str = None 1523 1524 def sys_hook(exc_type, exc_value, exc_traceback): 1525 nonlocal err_str 1526 err_str = str(exc_value) 1527 1528 with support.swap_attr(threading, 'excepthook', threading_hook), \ 1529 support.swap_attr(sys, 'excepthook', sys_hook), \ 1530 support.captured_output('stderr') as stderr: 1531 thread = ThreadRunFail() 1532 thread.start() 1533 thread.join() 1534 1535 self.assertEqual(stderr.getvalue(), 1536 'Exception in threading.excepthook:\n') 1537 self.assertEqual(err_str, 'threading_hook failed') 1538 1539 def test_original_excepthook(self): 1540 def run_thread(): 1541 with support.captured_output("stderr") as output: 1542 thread = ThreadRunFail(name="excepthook thread") 1543 thread.start() 1544 thread.join() 1545 return output.getvalue() 1546 1547 def threading_hook(args): 1548 print("Running a thread failed", file=sys.stderr) 1549 1550 default_output = run_thread() 1551 with support.swap_attr(threading, 'excepthook', threading_hook): 1552 custom_hook_output = run_thread() 1553 threading.excepthook = threading.__excepthook__ 1554 recovered_output = run_thread() 1555 1556 self.assertEqual(default_output, recovered_output) 1557 self.assertNotEqual(default_output, custom_hook_output) 1558 self.assertEqual(custom_hook_output, "Running a thread failed\n") 1559 1560 1561class TimerTests(BaseTestCase): 1562 1563 def setUp(self): 1564 BaseTestCase.setUp(self) 1565 self.callback_args = [] 1566 self.callback_event = threading.Event() 1567 1568 def test_init_immutable_default_args(self): 1569 # Issue 17435: constructor defaults were mutable objects, they could be 1570 # mutated via the object attributes and affect other Timer objects. 1571 timer1 = threading.Timer(0.01, self._callback_spy) 1572 timer1.start() 1573 self.callback_event.wait() 1574 timer1.args.append("blah") 1575 timer1.kwargs["foo"] = "bar" 1576 self.callback_event.clear() 1577 timer2 = threading.Timer(0.01, self._callback_spy) 1578 timer2.start() 1579 self.callback_event.wait() 1580 self.assertEqual(len(self.callback_args), 2) 1581 self.assertEqual(self.callback_args, [((), {}), ((), {})]) 1582 timer1.join() 1583 timer2.join() 1584 1585 def _callback_spy(self, *args, **kwargs): 1586 self.callback_args.append((args[:], kwargs.copy())) 1587 self.callback_event.set() 1588 1589class LockTests(lock_tests.LockTests): 1590 locktype = staticmethod(threading.Lock) 1591 1592class PyRLockTests(lock_tests.RLockTests): 1593 locktype = staticmethod(threading._PyRLock) 1594 1595@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C') 1596class CRLockTests(lock_tests.RLockTests): 1597 locktype = staticmethod(threading._CRLock) 1598 1599class EventTests(lock_tests.EventTests): 1600 eventtype = staticmethod(threading.Event) 1601 1602class ConditionAsRLockTests(lock_tests.RLockTests): 1603 # Condition uses an RLock by default and exports its API. 1604 locktype = staticmethod(threading.Condition) 1605 1606class ConditionTests(lock_tests.ConditionTests): 1607 condtype = staticmethod(threading.Condition) 1608 1609class SemaphoreTests(lock_tests.SemaphoreTests): 1610 semtype = staticmethod(threading.Semaphore) 1611 1612class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): 1613 semtype = staticmethod(threading.BoundedSemaphore) 1614 1615class BarrierTests(lock_tests.BarrierTests): 1616 barriertype = staticmethod(threading.Barrier) 1617 1618 1619class MiscTestCase(unittest.TestCase): 1620 def test__all__(self): 1621 restore_default_excepthook(self) 1622 1623 extra = {"ThreadError"} 1624 not_exported = {'currentThread', 'activeCount'} 1625 support.check__all__(self, threading, ('threading', '_thread'), 1626 extra=extra, not_exported=not_exported) 1627 1628 1629class InterruptMainTests(unittest.TestCase): 1630 def check_interrupt_main_with_signal_handler(self, signum): 1631 def handler(signum, frame): 1632 1/0 1633 1634 old_handler = signal.signal(signum, handler) 1635 self.addCleanup(signal.signal, signum, old_handler) 1636 1637 with self.assertRaises(ZeroDivisionError): 1638 _thread.interrupt_main() 1639 1640 def check_interrupt_main_noerror(self, signum): 1641 handler = signal.getsignal(signum) 1642 try: 1643 # No exception should arise. 1644 signal.signal(signum, signal.SIG_IGN) 1645 _thread.interrupt_main(signum) 1646 1647 signal.signal(signum, signal.SIG_DFL) 1648 _thread.interrupt_main(signum) 1649 finally: 1650 # Restore original handler 1651 signal.signal(signum, handler) 1652 1653 def test_interrupt_main_subthread(self): 1654 # Calling start_new_thread with a function that executes interrupt_main 1655 # should raise KeyboardInterrupt upon completion. 1656 def call_interrupt(): 1657 _thread.interrupt_main() 1658 t = threading.Thread(target=call_interrupt) 1659 with self.assertRaises(KeyboardInterrupt): 1660 t.start() 1661 t.join() 1662 t.join() 1663 1664 def test_interrupt_main_mainthread(self): 1665 # Make sure that if interrupt_main is called in main thread that 1666 # KeyboardInterrupt is raised instantly. 1667 with self.assertRaises(KeyboardInterrupt): 1668 _thread.interrupt_main() 1669 1670 def test_interrupt_main_with_signal_handler(self): 1671 self.check_interrupt_main_with_signal_handler(signal.SIGINT) 1672 self.check_interrupt_main_with_signal_handler(signal.SIGTERM) 1673 1674 def test_interrupt_main_noerror(self): 1675 self.check_interrupt_main_noerror(signal.SIGINT) 1676 self.check_interrupt_main_noerror(signal.SIGTERM) 1677 1678 def test_interrupt_main_invalid_signal(self): 1679 self.assertRaises(ValueError, _thread.interrupt_main, -1) 1680 self.assertRaises(ValueError, _thread.interrupt_main, signal.NSIG) 1681 self.assertRaises(ValueError, _thread.interrupt_main, 1000000) 1682 1683 @threading_helper.reap_threads 1684 def test_can_interrupt_tight_loops(self): 1685 cont = [True] 1686 started = [False] 1687 interrupted = [False] 1688 1689 def worker(started, cont, interrupted): 1690 iterations = 100_000_000 1691 started[0] = True 1692 while cont[0]: 1693 if iterations: 1694 iterations -= 1 1695 else: 1696 return 1697 pass 1698 interrupted[0] = True 1699 1700 t = threading.Thread(target=worker,args=(started, cont, interrupted)) 1701 t.start() 1702 while not started[0]: 1703 pass 1704 cont[0] = False 1705 t.join() 1706 self.assertTrue(interrupted[0]) 1707 1708 1709class AtexitTests(unittest.TestCase): 1710 1711 def test_atexit_output(self): 1712 rc, out, err = assert_python_ok("-c", """if True: 1713 import threading 1714 1715 def run_last(): 1716 print('parrot') 1717 1718 threading._register_atexit(run_last) 1719 """) 1720 1721 self.assertFalse(err) 1722 self.assertEqual(out.strip(), b'parrot') 1723 1724 def test_atexit_called_once(self): 1725 rc, out, err = assert_python_ok("-c", """if True: 1726 import threading 1727 from unittest.mock import Mock 1728 1729 mock = Mock() 1730 threading._register_atexit(mock) 1731 mock.assert_not_called() 1732 # force early shutdown to ensure it was called once 1733 threading._shutdown() 1734 mock.assert_called_once() 1735 """) 1736 1737 self.assertFalse(err) 1738 1739 def test_atexit_after_shutdown(self): 1740 # The only way to do this is by registering an atexit within 1741 # an atexit, which is intended to raise an exception. 1742 rc, out, err = assert_python_ok("-c", """if True: 1743 import threading 1744 1745 def func(): 1746 pass 1747 1748 def run_last(): 1749 threading._register_atexit(func) 1750 1751 threading._register_atexit(run_last) 1752 """) 1753 1754 self.assertTrue(err) 1755 self.assertIn("RuntimeError: can't register atexit after shutdown", 1756 err.decode()) 1757 1758 1759if __name__ == "__main__": 1760 unittest.main() 1761