1# As a test suite for the os module, this is woefully inadequate, but this 2# does add tests for a few functions which have been determined to be more 3# portable than they had been thought to be. 4 5import asyncio 6import codecs 7import contextlib 8import decimal 9import errno 10import fnmatch 11import fractions 12import itertools 13import locale 14import os 15import pickle 16import select 17import shutil 18import signal 19import socket 20import stat 21import struct 22import subprocess 23import sys 24import sysconfig 25import tempfile 26import textwrap 27import time 28import types 29import unittest 30import uuid 31import warnings 32from test import support 33from test.support import import_helper 34from test.support import os_helper 35from test.support import socket_helper 36from test.support import warnings_helper 37from platform import win32_is_iot 38 39try: 40 import resource 41except ImportError: 42 resource = None 43try: 44 import fcntl 45except ImportError: 46 fcntl = None 47try: 48 import _winapi 49except ImportError: 50 _winapi = None 51try: 52 import pwd 53 all_users = [u.pw_uid for u in pwd.getpwall()] 54except (ImportError, AttributeError): 55 all_users = [] 56try: 57 from _testcapi import INT_MAX, PY_SSIZE_T_MAX 58except ImportError: 59 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize 60 61try: 62 import mmap 63except ImportError: 64 mmap = None 65 66from test.support.script_helper import assert_python_ok 67from test.support import unix_shell 68from test.support.os_helper import FakePath 69 70 71root_in_posix = False 72if hasattr(os, 'geteuid'): 73 root_in_posix = (os.geteuid() == 0) 74 75# Detect whether we're on a Linux system that uses the (now outdated 76# and unmaintained) linuxthreads threading library. There's an issue 77# when combining linuxthreads with a failed execv call: see 78# http://bugs.python.org/issue4970. 79if hasattr(sys, 'thread_info') and sys.thread_info.version: 80 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads") 81else: 82 USING_LINUXTHREADS = False 83 84# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group. 85HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0 86 87 88def requires_os_func(name): 89 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name) 90 91 92def create_file(filename, content=b'content'): 93 with open(filename, "xb", 0) as fp: 94 fp.write(content) 95 96 97# bpo-41625: On AIX, splice() only works with a socket, not with a pipe. 98requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"), 99 'on AIX, splice() only accepts sockets') 100 101 102def tearDownModule(): 103 asyncio.set_event_loop_policy(None) 104 105 106class MiscTests(unittest.TestCase): 107 def test_getcwd(self): 108 cwd = os.getcwd() 109 self.assertIsInstance(cwd, str) 110 111 def test_getcwd_long_path(self): 112 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On 113 # Windows, MAX_PATH is defined as 260 characters, but Windows supports 114 # longer path if longer paths support is enabled. Internally, the os 115 # module uses MAXPATHLEN which is at least 1024. 116 # 117 # Use a directory name of 200 characters to fit into Windows MAX_PATH 118 # limit. 119 # 120 # On Windows, the test can stop when trying to create a path longer 121 # than MAX_PATH if long paths support is disabled: 122 # see RtlAreLongPathsEnabled(). 123 min_len = 2000 # characters 124 # On VxWorks, PATH_MAX is defined as 1024 bytes. Creating a path 125 # longer than PATH_MAX will fail. 126 if sys.platform == 'vxworks': 127 min_len = 1000 128 dirlen = 200 # characters 129 dirname = 'python_test_dir_' 130 dirname = dirname + ('a' * (dirlen - len(dirname))) 131 132 with tempfile.TemporaryDirectory() as tmpdir: 133 with os_helper.change_cwd(tmpdir) as path: 134 expected = path 135 136 while True: 137 cwd = os.getcwd() 138 self.assertEqual(cwd, expected) 139 140 need = min_len - (len(cwd) + len(os.path.sep)) 141 if need <= 0: 142 break 143 if len(dirname) > need and need > 0: 144 dirname = dirname[:need] 145 146 path = os.path.join(path, dirname) 147 try: 148 os.mkdir(path) 149 # On Windows, chdir() can fail 150 # even if mkdir() succeeded 151 os.chdir(path) 152 except FileNotFoundError: 153 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and 154 # ERROR_FILENAME_EXCED_RANGE (206) errors 155 # ("The filename or extension is too long") 156 break 157 except OSError as exc: 158 if exc.errno == errno.ENAMETOOLONG: 159 break 160 else: 161 raise 162 163 expected = path 164 165 if support.verbose: 166 print(f"Tested current directory length: {len(cwd)}") 167 168 def test_getcwdb(self): 169 cwd = os.getcwdb() 170 self.assertIsInstance(cwd, bytes) 171 self.assertEqual(os.fsdecode(cwd), os.getcwd()) 172 173 174# Tests creating TESTFN 175class FileTests(unittest.TestCase): 176 def setUp(self): 177 if os.path.lexists(os_helper.TESTFN): 178 os.unlink(os_helper.TESTFN) 179 tearDown = setUp 180 181 def test_access(self): 182 f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR) 183 os.close(f) 184 self.assertTrue(os.access(os_helper.TESTFN, os.W_OK)) 185 186 @unittest.skipIf( 187 support.is_emscripten, "Test is unstable under Emscripten." 188 ) 189 @unittest.skipIf( 190 support.is_wasi, "WASI does not support dup." 191 ) 192 def test_closerange(self): 193 first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR) 194 # We must allocate two consecutive file descriptors, otherwise 195 # it will mess up other file descriptors (perhaps even the three 196 # standard ones). 197 second = os.dup(first) 198 try: 199 retries = 0 200 while second != first + 1: 201 os.close(first) 202 retries += 1 203 if retries > 10: 204 # XXX test skipped 205 self.skipTest("couldn't allocate two consecutive fds") 206 first, second = second, os.dup(second) 207 finally: 208 os.close(second) 209 # close a fd that is open, and one that isn't 210 os.closerange(first, first + 2) 211 self.assertRaises(OSError, os.write, first, b"a") 212 213 @support.cpython_only 214 def test_rename(self): 215 path = os_helper.TESTFN 216 old = sys.getrefcount(path) 217 self.assertRaises(TypeError, os.rename, path, 0) 218 new = sys.getrefcount(path) 219 self.assertEqual(old, new) 220 221 def test_read(self): 222 with open(os_helper.TESTFN, "w+b") as fobj: 223 fobj.write(b"spam") 224 fobj.flush() 225 fd = fobj.fileno() 226 os.lseek(fd, 0, 0) 227 s = os.read(fd, 4) 228 self.assertEqual(type(s), bytes) 229 self.assertEqual(s, b"spam") 230 231 @support.cpython_only 232 # Skip the test on 32-bit platforms: the number of bytes must fit in a 233 # Py_ssize_t type 234 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX, 235 "needs INT_MAX < PY_SSIZE_T_MAX") 236 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False) 237 def test_large_read(self, size): 238 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 239 create_file(os_helper.TESTFN, b'test') 240 241 # Issue #21932: Make sure that os.read() does not raise an 242 # OverflowError for size larger than INT_MAX 243 with open(os_helper.TESTFN, "rb") as fp: 244 data = os.read(fp.fileno(), size) 245 246 # The test does not try to read more than 2 GiB at once because the 247 # operating system is free to return less bytes than requested. 248 self.assertEqual(data, b'test') 249 250 def test_write(self): 251 # os.write() accepts bytes- and buffer-like objects but not strings 252 fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY) 253 self.assertRaises(TypeError, os.write, fd, "beans") 254 os.write(fd, b"bacon\n") 255 os.write(fd, bytearray(b"eggs\n")) 256 os.write(fd, memoryview(b"spam\n")) 257 os.close(fd) 258 with open(os_helper.TESTFN, "rb") as fobj: 259 self.assertEqual(fobj.read().splitlines(), 260 [b"bacon", b"eggs", b"spam"]) 261 262 def write_windows_console(self, *args): 263 retcode = subprocess.call(args, 264 # use a new console to not flood the test output 265 creationflags=subprocess.CREATE_NEW_CONSOLE, 266 # use a shell to hide the console window (SW_HIDE) 267 shell=True) 268 self.assertEqual(retcode, 0) 269 270 @unittest.skipUnless(sys.platform == 'win32', 271 'test specific to the Windows console') 272 def test_write_windows_console(self): 273 # Issue #11395: the Windows console returns an error (12: not enough 274 # space error) on writing into stdout if stdout mode is binary and the 275 # length is greater than 66,000 bytes (or less, depending on heap 276 # usage). 277 code = "print('x' * 100000)" 278 self.write_windows_console(sys.executable, "-c", code) 279 self.write_windows_console(sys.executable, "-u", "-c", code) 280 281 def fdopen_helper(self, *args): 282 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 283 f = os.fdopen(fd, *args, encoding="utf-8") 284 f.close() 285 286 def test_fdopen(self): 287 fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR) 288 os.close(fd) 289 290 self.fdopen_helper() 291 self.fdopen_helper('r') 292 self.fdopen_helper('r', 100) 293 294 def test_replace(self): 295 TESTFN2 = os_helper.TESTFN + ".2" 296 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 297 self.addCleanup(os_helper.unlink, TESTFN2) 298 299 create_file(os_helper.TESTFN, b"1") 300 create_file(TESTFN2, b"2") 301 302 os.replace(os_helper.TESTFN, TESTFN2) 303 self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN) 304 with open(TESTFN2, 'r', encoding='utf-8') as f: 305 self.assertEqual(f.read(), "1") 306 307 def test_open_keywords(self): 308 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777, 309 dir_fd=None) 310 os.close(f) 311 312 def test_symlink_keywords(self): 313 symlink = support.get_attribute(os, "symlink") 314 try: 315 symlink(src='target', dst=os_helper.TESTFN, 316 target_is_directory=False, dir_fd=None) 317 except (NotImplementedError, OSError): 318 pass # No OS support or unprivileged user 319 320 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 321 def test_copy_file_range_invalid_values(self): 322 with self.assertRaises(ValueError): 323 os.copy_file_range(0, 1, -10) 324 325 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 326 def test_copy_file_range(self): 327 TESTFN2 = os_helper.TESTFN + ".3" 328 data = b'0123456789' 329 330 create_file(os_helper.TESTFN, data) 331 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 332 333 in_file = open(os_helper.TESTFN, 'rb') 334 self.addCleanup(in_file.close) 335 in_fd = in_file.fileno() 336 337 out_file = open(TESTFN2, 'w+b') 338 self.addCleanup(os_helper.unlink, TESTFN2) 339 self.addCleanup(out_file.close) 340 out_fd = out_file.fileno() 341 342 try: 343 i = os.copy_file_range(in_fd, out_fd, 5) 344 except OSError as e: 345 # Handle the case in which Python was compiled 346 # in a system with the syscall but without support 347 # in the kernel. 348 if e.errno != errno.ENOSYS: 349 raise 350 self.skipTest(e) 351 else: 352 # The number of copied bytes can be less than 353 # the number of bytes originally requested. 354 self.assertIn(i, range(0, 6)); 355 356 with open(TESTFN2, 'rb') as in_file: 357 self.assertEqual(in_file.read(), data[:i]) 358 359 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 360 def test_copy_file_range_offset(self): 361 TESTFN4 = os_helper.TESTFN + ".4" 362 data = b'0123456789' 363 bytes_to_copy = 6 364 in_skip = 3 365 out_seek = 5 366 367 create_file(os_helper.TESTFN, data) 368 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 369 370 in_file = open(os_helper.TESTFN, 'rb') 371 self.addCleanup(in_file.close) 372 in_fd = in_file.fileno() 373 374 out_file = open(TESTFN4, 'w+b') 375 self.addCleanup(os_helper.unlink, TESTFN4) 376 self.addCleanup(out_file.close) 377 out_fd = out_file.fileno() 378 379 try: 380 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy, 381 offset_src=in_skip, 382 offset_dst=out_seek) 383 except OSError as e: 384 # Handle the case in which Python was compiled 385 # in a system with the syscall but without support 386 # in the kernel. 387 if e.errno != errno.ENOSYS: 388 raise 389 self.skipTest(e) 390 else: 391 # The number of copied bytes can be less than 392 # the number of bytes originally requested. 393 self.assertIn(i, range(0, bytes_to_copy+1)); 394 395 with open(TESTFN4, 'rb') as in_file: 396 read = in_file.read() 397 # seeked bytes (5) are zero'ed 398 self.assertEqual(read[:out_seek], b'\x00'*out_seek) 399 # 012 are skipped (in_skip) 400 # 345678 are copied in the file (in_skip + bytes_to_copy) 401 self.assertEqual(read[out_seek:], 402 data[in_skip:in_skip+i]) 403 404 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') 405 def test_splice_invalid_values(self): 406 with self.assertRaises(ValueError): 407 os.splice(0, 1, -10) 408 409 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') 410 @requires_splice_pipe 411 def test_splice(self): 412 TESTFN2 = os_helper.TESTFN + ".3" 413 data = b'0123456789' 414 415 create_file(os_helper.TESTFN, data) 416 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 417 418 in_file = open(os_helper.TESTFN, 'rb') 419 self.addCleanup(in_file.close) 420 in_fd = in_file.fileno() 421 422 read_fd, write_fd = os.pipe() 423 self.addCleanup(lambda: os.close(read_fd)) 424 self.addCleanup(lambda: os.close(write_fd)) 425 426 try: 427 i = os.splice(in_fd, write_fd, 5) 428 except OSError as e: 429 # Handle the case in which Python was compiled 430 # in a system with the syscall but without support 431 # in the kernel. 432 if e.errno != errno.ENOSYS: 433 raise 434 self.skipTest(e) 435 else: 436 # The number of copied bytes can be less than 437 # the number of bytes originally requested. 438 self.assertIn(i, range(0, 6)); 439 440 self.assertEqual(os.read(read_fd, 100), data[:i]) 441 442 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') 443 @requires_splice_pipe 444 def test_splice_offset_in(self): 445 TESTFN4 = os_helper.TESTFN + ".4" 446 data = b'0123456789' 447 bytes_to_copy = 6 448 in_skip = 3 449 450 create_file(os_helper.TESTFN, data) 451 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 452 453 in_file = open(os_helper.TESTFN, 'rb') 454 self.addCleanup(in_file.close) 455 in_fd = in_file.fileno() 456 457 read_fd, write_fd = os.pipe() 458 self.addCleanup(lambda: os.close(read_fd)) 459 self.addCleanup(lambda: os.close(write_fd)) 460 461 try: 462 i = os.splice(in_fd, write_fd, bytes_to_copy, offset_src=in_skip) 463 except OSError as e: 464 # Handle the case in which Python was compiled 465 # in a system with the syscall but without support 466 # in the kernel. 467 if e.errno != errno.ENOSYS: 468 raise 469 self.skipTest(e) 470 else: 471 # The number of copied bytes can be less than 472 # the number of bytes originally requested. 473 self.assertIn(i, range(0, bytes_to_copy+1)); 474 475 read = os.read(read_fd, 100) 476 # 012 are skipped (in_skip) 477 # 345678 are copied in the file (in_skip + bytes_to_copy) 478 self.assertEqual(read, data[in_skip:in_skip+i]) 479 480 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') 481 @requires_splice_pipe 482 def test_splice_offset_out(self): 483 TESTFN4 = os_helper.TESTFN + ".4" 484 data = b'0123456789' 485 bytes_to_copy = 6 486 out_seek = 3 487 488 create_file(os_helper.TESTFN, data) 489 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 490 491 read_fd, write_fd = os.pipe() 492 self.addCleanup(lambda: os.close(read_fd)) 493 self.addCleanup(lambda: os.close(write_fd)) 494 os.write(write_fd, data) 495 496 out_file = open(TESTFN4, 'w+b') 497 self.addCleanup(os_helper.unlink, TESTFN4) 498 self.addCleanup(out_file.close) 499 out_fd = out_file.fileno() 500 501 try: 502 i = os.splice(read_fd, out_fd, bytes_to_copy, offset_dst=out_seek) 503 except OSError as e: 504 # Handle the case in which Python was compiled 505 # in a system with the syscall but without support 506 # in the kernel. 507 if e.errno != errno.ENOSYS: 508 raise 509 self.skipTest(e) 510 else: 511 # The number of copied bytes can be less than 512 # the number of bytes originally requested. 513 self.assertIn(i, range(0, bytes_to_copy+1)); 514 515 with open(TESTFN4, 'rb') as in_file: 516 read = in_file.read() 517 # seeked bytes (5) are zero'ed 518 self.assertEqual(read[:out_seek], b'\x00'*out_seek) 519 # 012 are skipped (in_skip) 520 # 345678 are copied in the file (in_skip + bytes_to_copy) 521 self.assertEqual(read[out_seek:], data[:i]) 522 523 524# Test attributes on return values from os.*stat* family. 525class StatAttributeTests(unittest.TestCase): 526 def setUp(self): 527 self.fname = os_helper.TESTFN 528 self.addCleanup(os_helper.unlink, self.fname) 529 create_file(self.fname, b"ABC") 530 531 def check_stat_attributes(self, fname): 532 result = os.stat(fname) 533 534 # Make sure direct access works 535 self.assertEqual(result[stat.ST_SIZE], 3) 536 self.assertEqual(result.st_size, 3) 537 538 # Make sure all the attributes are there 539 members = dir(result) 540 for name in dir(stat): 541 if name[:3] == 'ST_': 542 attr = name.lower() 543 if name.endswith("TIME"): 544 def trunc(x): return int(x) 545 else: 546 def trunc(x): return x 547 self.assertEqual(trunc(getattr(result, attr)), 548 result[getattr(stat, name)]) 549 self.assertIn(attr, members) 550 551 # Make sure that the st_?time and st_?time_ns fields roughly agree 552 # (they should always agree up to around tens-of-microseconds) 553 for name in 'st_atime st_mtime st_ctime'.split(): 554 floaty = int(getattr(result, name) * 100000) 555 nanosecondy = getattr(result, name + "_ns") // 10000 556 self.assertAlmostEqual(floaty, nanosecondy, delta=2) 557 558 try: 559 result[200] 560 self.fail("No exception raised") 561 except IndexError: 562 pass 563 564 # Make sure that assignment fails 565 try: 566 result.st_mode = 1 567 self.fail("No exception raised") 568 except AttributeError: 569 pass 570 571 try: 572 result.st_rdev = 1 573 self.fail("No exception raised") 574 except (AttributeError, TypeError): 575 pass 576 577 try: 578 result.parrot = 1 579 self.fail("No exception raised") 580 except AttributeError: 581 pass 582 583 # Use the stat_result constructor with a too-short tuple. 584 try: 585 result2 = os.stat_result((10,)) 586 self.fail("No exception raised") 587 except TypeError: 588 pass 589 590 # Use the constructor with a too-long tuple. 591 try: 592 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 593 except TypeError: 594 pass 595 596 def test_stat_attributes(self): 597 self.check_stat_attributes(self.fname) 598 599 def test_stat_attributes_bytes(self): 600 try: 601 fname = self.fname.encode(sys.getfilesystemencoding()) 602 except UnicodeEncodeError: 603 self.skipTest("cannot encode %a for the filesystem" % self.fname) 604 self.check_stat_attributes(fname) 605 606 def test_stat_result_pickle(self): 607 result = os.stat(self.fname) 608 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 609 p = pickle.dumps(result, proto) 610 self.assertIn(b'stat_result', p) 611 if proto < 4: 612 self.assertIn(b'cos\nstat_result\n', p) 613 unpickled = pickle.loads(p) 614 self.assertEqual(result, unpickled) 615 616 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') 617 def test_statvfs_attributes(self): 618 result = os.statvfs(self.fname) 619 620 # Make sure direct access works 621 self.assertEqual(result.f_bfree, result[3]) 622 623 # Make sure all the attributes are there. 624 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 625 'ffree', 'favail', 'flag', 'namemax') 626 for value, member in enumerate(members): 627 self.assertEqual(getattr(result, 'f_' + member), result[value]) 628 629 self.assertTrue(isinstance(result.f_fsid, int)) 630 631 # Test that the size of the tuple doesn't change 632 self.assertEqual(len(result), 10) 633 634 # Make sure that assignment really fails 635 try: 636 result.f_bfree = 1 637 self.fail("No exception raised") 638 except AttributeError: 639 pass 640 641 try: 642 result.parrot = 1 643 self.fail("No exception raised") 644 except AttributeError: 645 pass 646 647 # Use the constructor with a too-short tuple. 648 try: 649 result2 = os.statvfs_result((10,)) 650 self.fail("No exception raised") 651 except TypeError: 652 pass 653 654 # Use the constructor with a too-long tuple. 655 try: 656 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 657 except TypeError: 658 pass 659 660 @unittest.skipUnless(hasattr(os, 'statvfs'), 661 "need os.statvfs()") 662 def test_statvfs_result_pickle(self): 663 result = os.statvfs(self.fname) 664 665 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 666 p = pickle.dumps(result, proto) 667 self.assertIn(b'statvfs_result', p) 668 if proto < 4: 669 self.assertIn(b'cos\nstatvfs_result\n', p) 670 unpickled = pickle.loads(p) 671 self.assertEqual(result, unpickled) 672 673 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 674 def test_1686475(self): 675 # Verify that an open file can be stat'ed 676 try: 677 os.stat(r"c:\pagefile.sys") 678 except FileNotFoundError: 679 self.skipTest(r'c:\pagefile.sys does not exist') 680 except OSError as e: 681 self.fail("Could not stat pagefile.sys") 682 683 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 684 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 685 def test_15261(self): 686 # Verify that stat'ing a closed fd does not cause crash 687 r, w = os.pipe() 688 try: 689 os.stat(r) # should not raise error 690 finally: 691 os.close(r) 692 os.close(w) 693 with self.assertRaises(OSError) as ctx: 694 os.stat(r) 695 self.assertEqual(ctx.exception.errno, errno.EBADF) 696 697 def check_file_attributes(self, result): 698 self.assertTrue(hasattr(result, 'st_file_attributes')) 699 self.assertTrue(isinstance(result.st_file_attributes, int)) 700 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF) 701 702 @unittest.skipUnless(sys.platform == "win32", 703 "st_file_attributes is Win32 specific") 704 def test_file_attributes(self): 705 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set) 706 result = os.stat(self.fname) 707 self.check_file_attributes(result) 708 self.assertEqual( 709 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 710 0) 711 712 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set) 713 dirname = os_helper.TESTFN + "dir" 714 os.mkdir(dirname) 715 self.addCleanup(os.rmdir, dirname) 716 717 result = os.stat(dirname) 718 self.check_file_attributes(result) 719 self.assertEqual( 720 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 721 stat.FILE_ATTRIBUTE_DIRECTORY) 722 723 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 724 def test_access_denied(self): 725 # Default to FindFirstFile WIN32_FIND_DATA when access is 726 # denied. See issue 28075. 727 # os.environ['TEMP'] should be located on a volume that 728 # supports file ACLs. 729 fname = os.path.join(os.environ['TEMP'], self.fname) 730 self.addCleanup(os_helper.unlink, fname) 731 create_file(fname, b'ABC') 732 # Deny the right to [S]YNCHRONIZE on the file to 733 # force CreateFile to fail with ERROR_ACCESS_DENIED. 734 DETACHED_PROCESS = 8 735 subprocess.check_call( 736 # bpo-30584: Use security identifier *S-1-5-32-545 instead 737 # of localized "Users" to not depend on the locale. 738 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'], 739 creationflags=DETACHED_PROCESS 740 ) 741 result = os.stat(fname) 742 self.assertNotEqual(result.st_size, 0) 743 744 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 745 def test_stat_block_device(self): 746 # bpo-38030: os.stat fails for block devices 747 # Test a filename like "//./C:" 748 fname = "//./" + os.path.splitdrive(os.getcwd())[0] 749 result = os.stat(fname) 750 self.assertEqual(result.st_mode, stat.S_IFBLK) 751 752 753class UtimeTests(unittest.TestCase): 754 def setUp(self): 755 self.dirname = os_helper.TESTFN 756 self.fname = os.path.join(self.dirname, "f1") 757 758 self.addCleanup(os_helper.rmtree, self.dirname) 759 os.mkdir(self.dirname) 760 create_file(self.fname) 761 762 def support_subsecond(self, filename): 763 # Heuristic to check if the filesystem supports timestamp with 764 # subsecond resolution: check if float and int timestamps are different 765 st = os.stat(filename) 766 return ((st.st_atime != st[7]) 767 or (st.st_mtime != st[8]) 768 or (st.st_ctime != st[9])) 769 770 def _test_utime(self, set_time, filename=None): 771 if not filename: 772 filename = self.fname 773 774 support_subsecond = self.support_subsecond(filename) 775 if support_subsecond: 776 # Timestamp with a resolution of 1 microsecond (10^-6). 777 # 778 # The resolution of the C internal function used by os.utime() 779 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable 780 # test with a resolution of 1 ns requires more work: 781 # see the issue #15745. 782 atime_ns = 1002003000 # 1.002003 seconds 783 mtime_ns = 4005006000 # 4.005006 seconds 784 else: 785 # use a resolution of 1 second 786 atime_ns = 5 * 10**9 787 mtime_ns = 8 * 10**9 788 789 set_time(filename, (atime_ns, mtime_ns)) 790 st = os.stat(filename) 791 792 if support_subsecond: 793 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6) 794 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6) 795 else: 796 self.assertEqual(st.st_atime, atime_ns * 1e-9) 797 self.assertEqual(st.st_mtime, mtime_ns * 1e-9) 798 self.assertEqual(st.st_atime_ns, atime_ns) 799 self.assertEqual(st.st_mtime_ns, mtime_ns) 800 801 def test_utime(self): 802 def set_time(filename, ns): 803 # test the ns keyword parameter 804 os.utime(filename, ns=ns) 805 self._test_utime(set_time) 806 807 @staticmethod 808 def ns_to_sec(ns): 809 # Convert a number of nanosecond (int) to a number of seconds (float). 810 # Round towards infinity by adding 0.5 nanosecond to avoid rounding 811 # issue, os.utime() rounds towards minus infinity. 812 return (ns * 1e-9) + 0.5e-9 813 814 def test_utime_by_indexed(self): 815 # pass times as floating point seconds as the second indexed parameter 816 def set_time(filename, ns): 817 atime_ns, mtime_ns = ns 818 atime = self.ns_to_sec(atime_ns) 819 mtime = self.ns_to_sec(mtime_ns) 820 # test utimensat(timespec), utimes(timeval), utime(utimbuf) 821 # or utime(time_t) 822 os.utime(filename, (atime, mtime)) 823 self._test_utime(set_time) 824 825 def test_utime_by_times(self): 826 def set_time(filename, ns): 827 atime_ns, mtime_ns = ns 828 atime = self.ns_to_sec(atime_ns) 829 mtime = self.ns_to_sec(mtime_ns) 830 # test the times keyword parameter 831 os.utime(filename, times=(atime, mtime)) 832 self._test_utime(set_time) 833 834 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, 835 "follow_symlinks support for utime required " 836 "for this test.") 837 def test_utime_nofollow_symlinks(self): 838 def set_time(filename, ns): 839 # use follow_symlinks=False to test utimensat(timespec) 840 # or lutimes(timeval) 841 os.utime(filename, ns=ns, follow_symlinks=False) 842 self._test_utime(set_time) 843 844 @unittest.skipUnless(os.utime in os.supports_fd, 845 "fd support for utime required for this test.") 846 def test_utime_fd(self): 847 def set_time(filename, ns): 848 with open(filename, 'wb', 0) as fp: 849 # use a file descriptor to test futimens(timespec) 850 # or futimes(timeval) 851 os.utime(fp.fileno(), ns=ns) 852 self._test_utime(set_time) 853 854 @unittest.skipUnless(os.utime in os.supports_dir_fd, 855 "dir_fd support for utime required for this test.") 856 def test_utime_dir_fd(self): 857 def set_time(filename, ns): 858 dirname, name = os.path.split(filename) 859 with os_helper.open_dir_fd(dirname) as dirfd: 860 # pass dir_fd to test utimensat(timespec) or futimesat(timeval) 861 os.utime(name, dir_fd=dirfd, ns=ns) 862 self._test_utime(set_time) 863 864 def test_utime_directory(self): 865 def set_time(filename, ns): 866 # test calling os.utime() on a directory 867 os.utime(filename, ns=ns) 868 self._test_utime(set_time, filename=self.dirname) 869 870 def _test_utime_current(self, set_time): 871 # Get the system clock 872 current = time.time() 873 874 # Call os.utime() to set the timestamp to the current system clock 875 set_time(self.fname) 876 877 if not self.support_subsecond(self.fname): 878 delta = 1.0 879 else: 880 # On Windows, the usual resolution of time.time() is 15.6 ms. 881 # bpo-30649: Tolerate 50 ms for slow Windows buildbots. 882 # 883 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use 884 # also 50 ms on other platforms. 885 delta = 0.050 886 st = os.stat(self.fname) 887 msg = ("st_time=%r, current=%r, dt=%r" 888 % (st.st_mtime, current, st.st_mtime - current)) 889 self.assertAlmostEqual(st.st_mtime, current, 890 delta=delta, msg=msg) 891 892 def test_utime_current(self): 893 def set_time(filename): 894 # Set to the current time in the new way 895 os.utime(self.fname) 896 self._test_utime_current(set_time) 897 898 def test_utime_current_old(self): 899 def set_time(filename): 900 # Set to the current time in the old explicit way. 901 os.utime(self.fname, None) 902 self._test_utime_current(set_time) 903 904 def get_file_system(self, path): 905 if sys.platform == 'win32': 906 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' 907 import ctypes 908 kernel32 = ctypes.windll.kernel32 909 buf = ctypes.create_unicode_buffer("", 100) 910 ok = kernel32.GetVolumeInformationW(root, None, 0, 911 None, None, None, 912 buf, len(buf)) 913 if ok: 914 return buf.value 915 # return None if the filesystem is unknown 916 917 def test_large_time(self): 918 # Many filesystems are limited to the year 2038. At least, the test 919 # pass with NTFS filesystem. 920 if self.get_file_system(self.dirname) != "NTFS": 921 self.skipTest("requires NTFS") 922 923 large = 5000000000 # some day in 2128 924 os.utime(self.fname, (large, large)) 925 self.assertEqual(os.stat(self.fname).st_mtime, large) 926 927 def test_utime_invalid_arguments(self): 928 # seconds and nanoseconds parameters are mutually exclusive 929 with self.assertRaises(ValueError): 930 os.utime(self.fname, (5, 5), ns=(5, 5)) 931 with self.assertRaises(TypeError): 932 os.utime(self.fname, [5, 5]) 933 with self.assertRaises(TypeError): 934 os.utime(self.fname, (5,)) 935 with self.assertRaises(TypeError): 936 os.utime(self.fname, (5, 5, 5)) 937 with self.assertRaises(TypeError): 938 os.utime(self.fname, ns=[5, 5]) 939 with self.assertRaises(TypeError): 940 os.utime(self.fname, ns=(5,)) 941 with self.assertRaises(TypeError): 942 os.utime(self.fname, ns=(5, 5, 5)) 943 944 if os.utime not in os.supports_follow_symlinks: 945 with self.assertRaises(NotImplementedError): 946 os.utime(self.fname, (5, 5), follow_symlinks=False) 947 if os.utime not in os.supports_fd: 948 with open(self.fname, 'wb', 0) as fp: 949 with self.assertRaises(TypeError): 950 os.utime(fp.fileno(), (5, 5)) 951 if os.utime not in os.supports_dir_fd: 952 with self.assertRaises(NotImplementedError): 953 os.utime(self.fname, (5, 5), dir_fd=0) 954 955 @support.cpython_only 956 def test_issue31577(self): 957 # The interpreter shouldn't crash in case utime() received a bad 958 # ns argument. 959 def get_bad_int(divmod_ret_val): 960 class BadInt: 961 def __divmod__(*args): 962 return divmod_ret_val 963 return BadInt() 964 with self.assertRaises(TypeError): 965 os.utime(self.fname, ns=(get_bad_int(42), 1)) 966 with self.assertRaises(TypeError): 967 os.utime(self.fname, ns=(get_bad_int(()), 1)) 968 with self.assertRaises(TypeError): 969 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1)) 970 971 972from test import mapping_tests 973 974class EnvironTests(mapping_tests.BasicTestMappingProtocol): 975 """check that os.environ object conform to mapping protocol""" 976 type2test = None 977 978 def setUp(self): 979 self.__save = dict(os.environ) 980 if os.supports_bytes_environ: 981 self.__saveb = dict(os.environb) 982 for key, value in self._reference().items(): 983 os.environ[key] = value 984 985 def tearDown(self): 986 os.environ.clear() 987 os.environ.update(self.__save) 988 if os.supports_bytes_environ: 989 os.environb.clear() 990 os.environb.update(self.__saveb) 991 992 def _reference(self): 993 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"} 994 995 def _empty_mapping(self): 996 os.environ.clear() 997 return os.environ 998 999 # Bug 1110478 1000 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 1001 'requires a shell') 1002 @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()") 1003 @support.requires_subprocess() 1004 def test_update2(self): 1005 os.environ.clear() 1006 os.environ.update(HELLO="World") 1007 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen: 1008 value = popen.read().strip() 1009 self.assertEqual(value, "World") 1010 1011 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 1012 'requires a shell') 1013 @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()") 1014 @support.requires_subprocess() 1015 def test_os_popen_iter(self): 1016 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'" 1017 % unix_shell) as popen: 1018 it = iter(popen) 1019 self.assertEqual(next(it), "line1\n") 1020 self.assertEqual(next(it), "line2\n") 1021 self.assertEqual(next(it), "line3\n") 1022 self.assertRaises(StopIteration, next, it) 1023 1024 # Verify environ keys and values from the OS are of the 1025 # correct str type. 1026 def test_keyvalue_types(self): 1027 for key, val in os.environ.items(): 1028 self.assertEqual(type(key), str) 1029 self.assertEqual(type(val), str) 1030 1031 def test_items(self): 1032 for key, value in self._reference().items(): 1033 self.assertEqual(os.environ.get(key), value) 1034 1035 # Issue 7310 1036 def test___repr__(self): 1037 """Check that the repr() of os.environ looks like environ({...}).""" 1038 env = os.environ 1039 formatted_items = ", ".join( 1040 f"{key!r}: {value!r}" 1041 for key, value in env.items() 1042 ) 1043 self.assertEqual(repr(env), f"environ({{{formatted_items}}})") 1044 1045 def test_get_exec_path(self): 1046 defpath_list = os.defpath.split(os.pathsep) 1047 test_path = ['/monty', '/python', '', '/flying/circus'] 1048 test_env = {'PATH': os.pathsep.join(test_path)} 1049 1050 saved_environ = os.environ 1051 try: 1052 os.environ = dict(test_env) 1053 # Test that defaulting to os.environ works. 1054 self.assertSequenceEqual(test_path, os.get_exec_path()) 1055 self.assertSequenceEqual(test_path, os.get_exec_path(env=None)) 1056 finally: 1057 os.environ = saved_environ 1058 1059 # No PATH environment variable 1060 self.assertSequenceEqual(defpath_list, os.get_exec_path({})) 1061 # Empty PATH environment variable 1062 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''})) 1063 # Supplied PATH environment variable 1064 self.assertSequenceEqual(test_path, os.get_exec_path(test_env)) 1065 1066 if os.supports_bytes_environ: 1067 # env cannot contain 'PATH' and b'PATH' keys 1068 try: 1069 # ignore BytesWarning warning 1070 with warnings.catch_warnings(record=True): 1071 mixed_env = {'PATH': '1', b'PATH': b'2'} 1072 except BytesWarning: 1073 # mixed_env cannot be created with python -bb 1074 pass 1075 else: 1076 self.assertRaises(ValueError, os.get_exec_path, mixed_env) 1077 1078 # bytes key and/or value 1079 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}), 1080 ['abc']) 1081 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}), 1082 ['abc']) 1083 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}), 1084 ['abc']) 1085 1086 @unittest.skipUnless(os.supports_bytes_environ, 1087 "os.environb required for this test.") 1088 def test_environb(self): 1089 # os.environ -> os.environb 1090 value = 'euro\u20ac' 1091 try: 1092 value_bytes = value.encode(sys.getfilesystemencoding(), 1093 'surrogateescape') 1094 except UnicodeEncodeError: 1095 msg = "U+20AC character is not encodable to %s" % ( 1096 sys.getfilesystemencoding(),) 1097 self.skipTest(msg) 1098 os.environ['unicode'] = value 1099 self.assertEqual(os.environ['unicode'], value) 1100 self.assertEqual(os.environb[b'unicode'], value_bytes) 1101 1102 # os.environb -> os.environ 1103 value = b'\xff' 1104 os.environb[b'bytes'] = value 1105 self.assertEqual(os.environb[b'bytes'], value) 1106 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape') 1107 self.assertEqual(os.environ['bytes'], value_str) 1108 1109 @support.requires_subprocess() 1110 def test_putenv_unsetenv(self): 1111 name = "PYTHONTESTVAR" 1112 value = "testvalue" 1113 code = f'import os; print(repr(os.environ.get({name!r})))' 1114 1115 with os_helper.EnvironmentVarGuard() as env: 1116 env.pop(name, None) 1117 1118 os.putenv(name, value) 1119 proc = subprocess.run([sys.executable, '-c', code], check=True, 1120 stdout=subprocess.PIPE, text=True) 1121 self.assertEqual(proc.stdout.rstrip(), repr(value)) 1122 1123 os.unsetenv(name) 1124 proc = subprocess.run([sys.executable, '-c', code], check=True, 1125 stdout=subprocess.PIPE, text=True) 1126 self.assertEqual(proc.stdout.rstrip(), repr(None)) 1127 1128 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415). 1129 @support.requires_mac_ver(10, 6) 1130 def test_putenv_unsetenv_error(self): 1131 # Empty variable name is invalid. 1132 # "=" and null character are not allowed in a variable name. 1133 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'): 1134 self.assertRaises((OSError, ValueError), os.putenv, name, "value") 1135 self.assertRaises((OSError, ValueError), os.unsetenv, name) 1136 1137 if sys.platform == "win32": 1138 # On Windows, an environment variable string ("name=value" string) 1139 # is limited to 32,767 characters 1140 longstr = 'x' * 32_768 1141 self.assertRaises(ValueError, os.putenv, longstr, "1") 1142 self.assertRaises(ValueError, os.putenv, "X", longstr) 1143 self.assertRaises(ValueError, os.unsetenv, longstr) 1144 1145 def test_key_type(self): 1146 missing = 'missingkey' 1147 self.assertNotIn(missing, os.environ) 1148 1149 with self.assertRaises(KeyError) as cm: 1150 os.environ[missing] 1151 self.assertIs(cm.exception.args[0], missing) 1152 self.assertTrue(cm.exception.__suppress_context__) 1153 1154 with self.assertRaises(KeyError) as cm: 1155 del os.environ[missing] 1156 self.assertIs(cm.exception.args[0], missing) 1157 self.assertTrue(cm.exception.__suppress_context__) 1158 1159 def _test_environ_iteration(self, collection): 1160 iterator = iter(collection) 1161 new_key = "__new_key__" 1162 1163 next(iterator) # start iteration over os.environ.items 1164 1165 # add a new key in os.environ mapping 1166 os.environ[new_key] = "test_environ_iteration" 1167 1168 try: 1169 next(iterator) # force iteration over modified mapping 1170 self.assertEqual(os.environ[new_key], "test_environ_iteration") 1171 finally: 1172 del os.environ[new_key] 1173 1174 def test_iter_error_when_changing_os_environ(self): 1175 self._test_environ_iteration(os.environ) 1176 1177 def test_iter_error_when_changing_os_environ_items(self): 1178 self._test_environ_iteration(os.environ.items()) 1179 1180 def test_iter_error_when_changing_os_environ_values(self): 1181 self._test_environ_iteration(os.environ.values()) 1182 1183 def _test_underlying_process_env(self, var, expected): 1184 if not (unix_shell and os.path.exists(unix_shell)): 1185 return 1186 elif not support.has_subprocess_support: 1187 return 1188 1189 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen: 1190 value = popen.read().strip() 1191 1192 self.assertEqual(expected, value) 1193 1194 def test_or_operator(self): 1195 overridden_key = '_TEST_VAR_' 1196 original_value = 'original_value' 1197 os.environ[overridden_key] = original_value 1198 1199 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} 1200 expected = dict(os.environ) 1201 expected.update(new_vars_dict) 1202 1203 actual = os.environ | new_vars_dict 1204 self.assertDictEqual(expected, actual) 1205 self.assertEqual('3', actual[overridden_key]) 1206 1207 new_vars_items = new_vars_dict.items() 1208 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items)) 1209 1210 self._test_underlying_process_env('_A_', '') 1211 self._test_underlying_process_env(overridden_key, original_value) 1212 1213 def test_ior_operator(self): 1214 overridden_key = '_TEST_VAR_' 1215 os.environ[overridden_key] = 'original_value' 1216 1217 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} 1218 expected = dict(os.environ) 1219 expected.update(new_vars_dict) 1220 1221 os.environ |= new_vars_dict 1222 self.assertEqual(expected, os.environ) 1223 self.assertEqual('3', os.environ[overridden_key]) 1224 1225 self._test_underlying_process_env('_A_', '1') 1226 self._test_underlying_process_env(overridden_key, '3') 1227 1228 def test_ior_operator_invalid_dicts(self): 1229 os_environ_copy = os.environ.copy() 1230 with self.assertRaises(TypeError): 1231 dict_with_bad_key = {1: '_A_'} 1232 os.environ |= dict_with_bad_key 1233 1234 with self.assertRaises(TypeError): 1235 dict_with_bad_val = {'_A_': 1} 1236 os.environ |= dict_with_bad_val 1237 1238 # Check nothing was added. 1239 self.assertEqual(os_environ_copy, os.environ) 1240 1241 def test_ior_operator_key_value_iterable(self): 1242 overridden_key = '_TEST_VAR_' 1243 os.environ[overridden_key] = 'original_value' 1244 1245 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3')) 1246 expected = dict(os.environ) 1247 expected.update(new_vars_items) 1248 1249 os.environ |= new_vars_items 1250 self.assertEqual(expected, os.environ) 1251 self.assertEqual('3', os.environ[overridden_key]) 1252 1253 self._test_underlying_process_env('_A_', '1') 1254 self._test_underlying_process_env(overridden_key, '3') 1255 1256 def test_ror_operator(self): 1257 overridden_key = '_TEST_VAR_' 1258 original_value = 'original_value' 1259 os.environ[overridden_key] = original_value 1260 1261 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} 1262 expected = dict(new_vars_dict) 1263 expected.update(os.environ) 1264 1265 actual = new_vars_dict | os.environ 1266 self.assertDictEqual(expected, actual) 1267 self.assertEqual(original_value, actual[overridden_key]) 1268 1269 new_vars_items = new_vars_dict.items() 1270 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items)) 1271 1272 self._test_underlying_process_env('_A_', '') 1273 self._test_underlying_process_env(overridden_key, original_value) 1274 1275 1276class WalkTests(unittest.TestCase): 1277 """Tests for os.walk().""" 1278 1279 # Wrapper to hide minor differences between os.walk and os.fwalk 1280 # to tests both functions with the same code base 1281 def walk(self, top, **kwargs): 1282 if 'follow_symlinks' in kwargs: 1283 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 1284 return os.walk(top, **kwargs) 1285 1286 def setUp(self): 1287 join = os.path.join 1288 self.addCleanup(os_helper.rmtree, os_helper.TESTFN) 1289 1290 # Build: 1291 # TESTFN/ 1292 # TEST1/ a file kid and two directory kids 1293 # tmp1 1294 # SUB1/ a file kid and a directory kid 1295 # tmp2 1296 # SUB11/ no kids 1297 # SUB2/ a file kid and a dirsymlink kid 1298 # tmp3 1299 # SUB21/ not readable 1300 # tmp5 1301 # link/ a symlink to TESTFN.2 1302 # broken_link 1303 # broken_link2 1304 # broken_link3 1305 # TEST2/ 1306 # tmp4 a lone file 1307 self.walk_path = join(os_helper.TESTFN, "TEST1") 1308 self.sub1_path = join(self.walk_path, "SUB1") 1309 self.sub11_path = join(self.sub1_path, "SUB11") 1310 sub2_path = join(self.walk_path, "SUB2") 1311 sub21_path = join(sub2_path, "SUB21") 1312 tmp1_path = join(self.walk_path, "tmp1") 1313 tmp2_path = join(self.sub1_path, "tmp2") 1314 tmp3_path = join(sub2_path, "tmp3") 1315 tmp5_path = join(sub21_path, "tmp3") 1316 self.link_path = join(sub2_path, "link") 1317 t2_path = join(os_helper.TESTFN, "TEST2") 1318 tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4") 1319 broken_link_path = join(sub2_path, "broken_link") 1320 broken_link2_path = join(sub2_path, "broken_link2") 1321 broken_link3_path = join(sub2_path, "broken_link3") 1322 1323 # Create stuff. 1324 os.makedirs(self.sub11_path) 1325 os.makedirs(sub2_path) 1326 os.makedirs(sub21_path) 1327 os.makedirs(t2_path) 1328 1329 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path: 1330 with open(path, "x", encoding='utf-8') as f: 1331 f.write("I'm " + path + " and proud of it. Blame test_os.\n") 1332 1333 if os_helper.can_symlink(): 1334 os.symlink(os.path.abspath(t2_path), self.link_path) 1335 os.symlink('broken', broken_link_path, True) 1336 os.symlink(join('tmp3', 'broken'), broken_link2_path, True) 1337 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True) 1338 self.sub2_tree = (sub2_path, ["SUB21", "link"], 1339 ["broken_link", "broken_link2", "broken_link3", 1340 "tmp3"]) 1341 else: 1342 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"]) 1343 1344 if not support.is_emscripten: 1345 # Emscripten fails with inaccessible directory 1346 os.chmod(sub21_path, 0) 1347 try: 1348 os.listdir(sub21_path) 1349 except PermissionError: 1350 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU) 1351 else: 1352 os.chmod(sub21_path, stat.S_IRWXU) 1353 os.unlink(tmp5_path) 1354 os.rmdir(sub21_path) 1355 del self.sub2_tree[1][:1] 1356 1357 def test_walk_topdown(self): 1358 # Walk top-down. 1359 all = list(self.walk(self.walk_path)) 1360 1361 self.assertEqual(len(all), 4) 1362 # We can't know which order SUB1 and SUB2 will appear in. 1363 # Not flipped: TESTFN, SUB1, SUB11, SUB2 1364 # flipped: TESTFN, SUB2, SUB1, SUB11 1365 flipped = all[0][1][0] != "SUB1" 1366 all[0][1].sort() 1367 all[3 - 2 * flipped][-1].sort() 1368 all[3 - 2 * flipped][1].sort() 1369 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 1370 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"])) 1371 self.assertEqual(all[2 + flipped], (self.sub11_path, [], [])) 1372 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree) 1373 1374 def test_walk_prune(self, walk_path=None): 1375 if walk_path is None: 1376 walk_path = self.walk_path 1377 # Prune the search. 1378 all = [] 1379 for root, dirs, files in self.walk(walk_path): 1380 all.append((root, dirs, files)) 1381 # Don't descend into SUB1. 1382 if 'SUB1' in dirs: 1383 # Note that this also mutates the dirs we appended to all! 1384 dirs.remove('SUB1') 1385 1386 self.assertEqual(len(all), 2) 1387 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"])) 1388 1389 all[1][-1].sort() 1390 all[1][1].sort() 1391 self.assertEqual(all[1], self.sub2_tree) 1392 1393 def test_file_like_path(self): 1394 self.test_walk_prune(FakePath(self.walk_path)) 1395 1396 def test_walk_bottom_up(self): 1397 # Walk bottom-up. 1398 all = list(self.walk(self.walk_path, topdown=False)) 1399 1400 self.assertEqual(len(all), 4, all) 1401 # We can't know which order SUB1 and SUB2 will appear in. 1402 # Not flipped: SUB11, SUB1, SUB2, TESTFN 1403 # flipped: SUB2, SUB11, SUB1, TESTFN 1404 flipped = all[3][1][0] != "SUB1" 1405 all[3][1].sort() 1406 all[2 - 2 * flipped][-1].sort() 1407 all[2 - 2 * flipped][1].sort() 1408 self.assertEqual(all[3], 1409 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 1410 self.assertEqual(all[flipped], 1411 (self.sub11_path, [], [])) 1412 self.assertEqual(all[flipped + 1], 1413 (self.sub1_path, ["SUB11"], ["tmp2"])) 1414 self.assertEqual(all[2 - 2 * flipped], 1415 self.sub2_tree) 1416 1417 def test_walk_symlink(self): 1418 if not os_helper.can_symlink(): 1419 self.skipTest("need symlink support") 1420 1421 # Walk, following symlinks. 1422 walk_it = self.walk(self.walk_path, follow_symlinks=True) 1423 for root, dirs, files in walk_it: 1424 if root == self.link_path: 1425 self.assertEqual(dirs, []) 1426 self.assertEqual(files, ["tmp4"]) 1427 break 1428 else: 1429 self.fail("Didn't follow symlink with followlinks=True") 1430 1431 def test_walk_bad_dir(self): 1432 # Walk top-down. 1433 errors = [] 1434 walk_it = self.walk(self.walk_path, onerror=errors.append) 1435 root, dirs, files = next(walk_it) 1436 self.assertEqual(errors, []) 1437 dir1 = 'SUB1' 1438 path1 = os.path.join(root, dir1) 1439 path1new = os.path.join(root, dir1 + '.new') 1440 os.rename(path1, path1new) 1441 try: 1442 roots = [r for r, d, f in walk_it] 1443 self.assertTrue(errors) 1444 self.assertNotIn(path1, roots) 1445 self.assertNotIn(path1new, roots) 1446 for dir2 in dirs: 1447 if dir2 != dir1: 1448 self.assertIn(os.path.join(root, dir2), roots) 1449 finally: 1450 os.rename(path1new, path1) 1451 1452 def test_walk_many_open_files(self): 1453 depth = 30 1454 base = os.path.join(os_helper.TESTFN, 'deep') 1455 p = os.path.join(base, *(['d']*depth)) 1456 os.makedirs(p) 1457 1458 iters = [self.walk(base, topdown=False) for j in range(100)] 1459 for i in range(depth + 1): 1460 expected = (p, ['d'] if i else [], []) 1461 for it in iters: 1462 self.assertEqual(next(it), expected) 1463 p = os.path.dirname(p) 1464 1465 iters = [self.walk(base, topdown=True) for j in range(100)] 1466 p = base 1467 for i in range(depth + 1): 1468 expected = (p, ['d'] if i < depth else [], []) 1469 for it in iters: 1470 self.assertEqual(next(it), expected) 1471 p = os.path.join(p, 'd') 1472 1473 1474@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1475class FwalkTests(WalkTests): 1476 """Tests for os.fwalk().""" 1477 1478 def walk(self, top, **kwargs): 1479 for root, dirs, files, root_fd in self.fwalk(top, **kwargs): 1480 yield (root, dirs, files) 1481 1482 def fwalk(self, *args, **kwargs): 1483 return os.fwalk(*args, **kwargs) 1484 1485 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs): 1486 """ 1487 compare with walk() results. 1488 """ 1489 walk_kwargs = walk_kwargs.copy() 1490 fwalk_kwargs = fwalk_kwargs.copy() 1491 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1492 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks) 1493 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks) 1494 1495 expected = {} 1496 for root, dirs, files in os.walk(**walk_kwargs): 1497 expected[root] = (set(dirs), set(files)) 1498 1499 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs): 1500 self.assertIn(root, expected) 1501 self.assertEqual(expected[root], (set(dirs), set(files))) 1502 1503 def test_compare_to_walk(self): 1504 kwargs = {'top': os_helper.TESTFN} 1505 self._compare_to_walk(kwargs, kwargs) 1506 1507 def test_dir_fd(self): 1508 try: 1509 fd = os.open(".", os.O_RDONLY) 1510 walk_kwargs = {'top': os_helper.TESTFN} 1511 fwalk_kwargs = walk_kwargs.copy() 1512 fwalk_kwargs['dir_fd'] = fd 1513 self._compare_to_walk(walk_kwargs, fwalk_kwargs) 1514 finally: 1515 os.close(fd) 1516 1517 def test_yields_correct_dir_fd(self): 1518 # check returned file descriptors 1519 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1520 args = os_helper.TESTFN, topdown, None 1521 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks): 1522 # check that the FD is valid 1523 os.fstat(rootfd) 1524 # redundant check 1525 os.stat(rootfd) 1526 # check that listdir() returns consistent information 1527 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files)) 1528 1529 @unittest.skipIf( 1530 support.is_emscripten, "Cannot dup stdout on Emscripten" 1531 ) 1532 def test_fd_leak(self): 1533 # Since we're opening a lot of FDs, we must be careful to avoid leaks: 1534 # we both check that calling fwalk() a large number of times doesn't 1535 # yield EMFILE, and that the minimum allocated FD hasn't changed. 1536 minfd = os.dup(1) 1537 os.close(minfd) 1538 for i in range(256): 1539 for x in self.fwalk(os_helper.TESTFN): 1540 pass 1541 newfd = os.dup(1) 1542 self.addCleanup(os.close, newfd) 1543 self.assertEqual(newfd, minfd) 1544 1545 # fwalk() keeps file descriptors open 1546 test_walk_many_open_files = None 1547 1548 1549class BytesWalkTests(WalkTests): 1550 """Tests for os.walk() with bytes.""" 1551 def walk(self, top, **kwargs): 1552 if 'follow_symlinks' in kwargs: 1553 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 1554 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs): 1555 root = os.fsdecode(broot) 1556 dirs = list(map(os.fsdecode, bdirs)) 1557 files = list(map(os.fsdecode, bfiles)) 1558 yield (root, dirs, files) 1559 bdirs[:] = list(map(os.fsencode, dirs)) 1560 bfiles[:] = list(map(os.fsencode, files)) 1561 1562@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1563class BytesFwalkTests(FwalkTests): 1564 """Tests for os.walk() with bytes.""" 1565 def fwalk(self, top='.', *args, **kwargs): 1566 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs): 1567 root = os.fsdecode(broot) 1568 dirs = list(map(os.fsdecode, bdirs)) 1569 files = list(map(os.fsdecode, bfiles)) 1570 yield (root, dirs, files, topfd) 1571 bdirs[:] = list(map(os.fsencode, dirs)) 1572 bfiles[:] = list(map(os.fsencode, files)) 1573 1574 1575class MakedirTests(unittest.TestCase): 1576 def setUp(self): 1577 os.mkdir(os_helper.TESTFN) 1578 1579 def test_makedir(self): 1580 base = os_helper.TESTFN 1581 path = os.path.join(base, 'dir1', 'dir2', 'dir3') 1582 os.makedirs(path) # Should work 1583 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4') 1584 os.makedirs(path) 1585 1586 # Try paths with a '.' in them 1587 self.assertRaises(OSError, os.makedirs, os.curdir) 1588 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir) 1589 os.makedirs(path) 1590 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4', 1591 'dir5', 'dir6') 1592 os.makedirs(path) 1593 1594 @unittest.skipIf( 1595 support.is_emscripten or support.is_wasi, 1596 "Emscripten's/WASI's umask is a stub." 1597 ) 1598 def test_mode(self): 1599 with os_helper.temp_umask(0o002): 1600 base = os_helper.TESTFN 1601 parent = os.path.join(base, 'dir1') 1602 path = os.path.join(parent, 'dir2') 1603 os.makedirs(path, 0o555) 1604 self.assertTrue(os.path.exists(path)) 1605 self.assertTrue(os.path.isdir(path)) 1606 if os.name != 'nt': 1607 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555) 1608 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775) 1609 1610 @unittest.skipIf( 1611 support.is_emscripten or support.is_wasi, 1612 "Emscripten's/WASI's umask is a stub." 1613 ) 1614 def test_exist_ok_existing_directory(self): 1615 path = os.path.join(os_helper.TESTFN, 'dir1') 1616 mode = 0o777 1617 old_mask = os.umask(0o022) 1618 os.makedirs(path, mode) 1619 self.assertRaises(OSError, os.makedirs, path, mode) 1620 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False) 1621 os.makedirs(path, 0o776, exist_ok=True) 1622 os.makedirs(path, mode=mode, exist_ok=True) 1623 os.umask(old_mask) 1624 1625 # Issue #25583: A drive root could raise PermissionError on Windows 1626 os.makedirs(os.path.abspath('/'), exist_ok=True) 1627 1628 @unittest.skipIf( 1629 support.is_emscripten or support.is_wasi, 1630 "Emscripten's/WASI's umask is a stub." 1631 ) 1632 def test_exist_ok_s_isgid_directory(self): 1633 path = os.path.join(os_helper.TESTFN, 'dir1') 1634 S_ISGID = stat.S_ISGID 1635 mode = 0o777 1636 old_mask = os.umask(0o022) 1637 try: 1638 existing_testfn_mode = stat.S_IMODE( 1639 os.lstat(os_helper.TESTFN).st_mode) 1640 try: 1641 os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID) 1642 except PermissionError: 1643 raise unittest.SkipTest('Cannot set S_ISGID for dir.') 1644 if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID): 1645 raise unittest.SkipTest('No support for S_ISGID dir mode.') 1646 # The os should apply S_ISGID from the parent dir for us, but 1647 # this test need not depend on that behavior. Be explicit. 1648 os.makedirs(path, mode | S_ISGID) 1649 # http://bugs.python.org/issue14992 1650 # Should not fail when the bit is already set. 1651 os.makedirs(path, mode, exist_ok=True) 1652 # remove the bit. 1653 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID) 1654 # May work even when the bit is not already set when demanded. 1655 os.makedirs(path, mode | S_ISGID, exist_ok=True) 1656 finally: 1657 os.umask(old_mask) 1658 1659 def test_exist_ok_existing_regular_file(self): 1660 base = os_helper.TESTFN 1661 path = os.path.join(os_helper.TESTFN, 'dir1') 1662 with open(path, 'w', encoding='utf-8') as f: 1663 f.write('abc') 1664 self.assertRaises(OSError, os.makedirs, path) 1665 self.assertRaises(OSError, os.makedirs, path, exist_ok=False) 1666 self.assertRaises(OSError, os.makedirs, path, exist_ok=True) 1667 os.remove(path) 1668 1669 def tearDown(self): 1670 path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3', 1671 'dir4', 'dir5', 'dir6') 1672 # If the tests failed, the bottom-most directory ('../dir6') 1673 # may not have been created, so we look for the outermost directory 1674 # that exists. 1675 while not os.path.exists(path) and path != os_helper.TESTFN: 1676 path = os.path.dirname(path) 1677 1678 os.removedirs(path) 1679 1680 1681@os_helper.skip_unless_working_chmod 1682class ChownFileTests(unittest.TestCase): 1683 1684 @classmethod 1685 def setUpClass(cls): 1686 os.mkdir(os_helper.TESTFN) 1687 1688 def test_chown_uid_gid_arguments_must_be_index(self): 1689 stat = os.stat(os_helper.TESTFN) 1690 uid = stat.st_uid 1691 gid = stat.st_gid 1692 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): 1693 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid) 1694 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value) 1695 self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid)) 1696 self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1)) 1697 1698 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups') 1699 def test_chown_gid(self): 1700 groups = os.getgroups() 1701 if len(groups) < 2: 1702 self.skipTest("test needs at least 2 groups") 1703 1704 gid_1, gid_2 = groups[:2] 1705 uid = os.stat(os_helper.TESTFN).st_uid 1706 1707 os.chown(os_helper.TESTFN, uid, gid_1) 1708 gid = os.stat(os_helper.TESTFN).st_gid 1709 self.assertEqual(gid, gid_1) 1710 1711 os.chown(os_helper.TESTFN, uid, gid_2) 1712 gid = os.stat(os_helper.TESTFN).st_gid 1713 self.assertEqual(gid, gid_2) 1714 1715 @unittest.skipUnless(root_in_posix and len(all_users) > 1, 1716 "test needs root privilege and more than one user") 1717 def test_chown_with_root(self): 1718 uid_1, uid_2 = all_users[:2] 1719 gid = os.stat(os_helper.TESTFN).st_gid 1720 os.chown(os_helper.TESTFN, uid_1, gid) 1721 uid = os.stat(os_helper.TESTFN).st_uid 1722 self.assertEqual(uid, uid_1) 1723 os.chown(os_helper.TESTFN, uid_2, gid) 1724 uid = os.stat(os_helper.TESTFN).st_uid 1725 self.assertEqual(uid, uid_2) 1726 1727 @unittest.skipUnless(not root_in_posix and len(all_users) > 1, 1728 "test needs non-root account and more than one user") 1729 def test_chown_without_permission(self): 1730 uid_1, uid_2 = all_users[:2] 1731 gid = os.stat(os_helper.TESTFN).st_gid 1732 with self.assertRaises(PermissionError): 1733 os.chown(os_helper.TESTFN, uid_1, gid) 1734 os.chown(os_helper.TESTFN, uid_2, gid) 1735 1736 @classmethod 1737 def tearDownClass(cls): 1738 os.rmdir(os_helper.TESTFN) 1739 1740 1741class RemoveDirsTests(unittest.TestCase): 1742 def setUp(self): 1743 os.makedirs(os_helper.TESTFN) 1744 1745 def tearDown(self): 1746 os_helper.rmtree(os_helper.TESTFN) 1747 1748 def test_remove_all(self): 1749 dira = os.path.join(os_helper.TESTFN, 'dira') 1750 os.mkdir(dira) 1751 dirb = os.path.join(dira, 'dirb') 1752 os.mkdir(dirb) 1753 os.removedirs(dirb) 1754 self.assertFalse(os.path.exists(dirb)) 1755 self.assertFalse(os.path.exists(dira)) 1756 self.assertFalse(os.path.exists(os_helper.TESTFN)) 1757 1758 def test_remove_partial(self): 1759 dira = os.path.join(os_helper.TESTFN, 'dira') 1760 os.mkdir(dira) 1761 dirb = os.path.join(dira, 'dirb') 1762 os.mkdir(dirb) 1763 create_file(os.path.join(dira, 'file.txt')) 1764 os.removedirs(dirb) 1765 self.assertFalse(os.path.exists(dirb)) 1766 self.assertTrue(os.path.exists(dira)) 1767 self.assertTrue(os.path.exists(os_helper.TESTFN)) 1768 1769 def test_remove_nothing(self): 1770 dira = os.path.join(os_helper.TESTFN, 'dira') 1771 os.mkdir(dira) 1772 dirb = os.path.join(dira, 'dirb') 1773 os.mkdir(dirb) 1774 create_file(os.path.join(dirb, 'file.txt')) 1775 with self.assertRaises(OSError): 1776 os.removedirs(dirb) 1777 self.assertTrue(os.path.exists(dirb)) 1778 self.assertTrue(os.path.exists(dira)) 1779 self.assertTrue(os.path.exists(os_helper.TESTFN)) 1780 1781 1782@unittest.skipIf(support.is_wasi, "WASI has no /dev/null") 1783class DevNullTests(unittest.TestCase): 1784 def test_devnull(self): 1785 with open(os.devnull, 'wb', 0) as f: 1786 f.write(b'hello') 1787 f.close() 1788 with open(os.devnull, 'rb') as f: 1789 self.assertEqual(f.read(), b'') 1790 1791 1792class URandomTests(unittest.TestCase): 1793 def test_urandom_length(self): 1794 self.assertEqual(len(os.urandom(0)), 0) 1795 self.assertEqual(len(os.urandom(1)), 1) 1796 self.assertEqual(len(os.urandom(10)), 10) 1797 self.assertEqual(len(os.urandom(100)), 100) 1798 self.assertEqual(len(os.urandom(1000)), 1000) 1799 1800 def test_urandom_value(self): 1801 data1 = os.urandom(16) 1802 self.assertIsInstance(data1, bytes) 1803 data2 = os.urandom(16) 1804 self.assertNotEqual(data1, data2) 1805 1806 def get_urandom_subprocess(self, count): 1807 code = '\n'.join(( 1808 'import os, sys', 1809 'data = os.urandom(%s)' % count, 1810 'sys.stdout.buffer.write(data)', 1811 'sys.stdout.buffer.flush()')) 1812 out = assert_python_ok('-c', code) 1813 stdout = out[1] 1814 self.assertEqual(len(stdout), count) 1815 return stdout 1816 1817 def test_urandom_subprocess(self): 1818 data1 = self.get_urandom_subprocess(16) 1819 data2 = self.get_urandom_subprocess(16) 1820 self.assertNotEqual(data1, data2) 1821 1822 1823@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()') 1824class GetRandomTests(unittest.TestCase): 1825 @classmethod 1826 def setUpClass(cls): 1827 try: 1828 os.getrandom(1) 1829 except OSError as exc: 1830 if exc.errno == errno.ENOSYS: 1831 # Python compiled on a more recent Linux version 1832 # than the current Linux kernel 1833 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS") 1834 else: 1835 raise 1836 1837 def test_getrandom_type(self): 1838 data = os.getrandom(16) 1839 self.assertIsInstance(data, bytes) 1840 self.assertEqual(len(data), 16) 1841 1842 def test_getrandom0(self): 1843 empty = os.getrandom(0) 1844 self.assertEqual(empty, b'') 1845 1846 def test_getrandom_random(self): 1847 self.assertTrue(hasattr(os, 'GRND_RANDOM')) 1848 1849 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare 1850 # resource /dev/random 1851 1852 def test_getrandom_nonblock(self): 1853 # The call must not fail. Check also that the flag exists 1854 try: 1855 os.getrandom(1, os.GRND_NONBLOCK) 1856 except BlockingIOError: 1857 # System urandom is not initialized yet 1858 pass 1859 1860 def test_getrandom_value(self): 1861 data1 = os.getrandom(16) 1862 data2 = os.getrandom(16) 1863 self.assertNotEqual(data1, data2) 1864 1865 1866# os.urandom() doesn't use a file descriptor when it is implemented with the 1867# getentropy() function, the getrandom() function or the getrandom() syscall 1868OS_URANDOM_DONT_USE_FD = ( 1869 sysconfig.get_config_var('HAVE_GETENTROPY') == 1 1870 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1 1871 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1) 1872 1873@unittest.skipIf(OS_URANDOM_DONT_USE_FD , 1874 "os.random() does not use a file descriptor") 1875@unittest.skipIf(sys.platform == "vxworks", 1876 "VxWorks can't set RLIMIT_NOFILE to 1") 1877class URandomFDTests(unittest.TestCase): 1878 @unittest.skipUnless(resource, "test requires the resource module") 1879 def test_urandom_failure(self): 1880 # Check urandom() failing when it is not able to open /dev/random. 1881 # We spawn a new process to make the test more robust (if getrlimit() 1882 # failed to restore the file descriptor limit after this, the whole 1883 # test suite would crash; this actually happened on the OS X Tiger 1884 # buildbot). 1885 code = """if 1: 1886 import errno 1887 import os 1888 import resource 1889 1890 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) 1891 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit)) 1892 try: 1893 os.urandom(16) 1894 except OSError as e: 1895 assert e.errno == errno.EMFILE, e.errno 1896 else: 1897 raise AssertionError("OSError not raised") 1898 """ 1899 assert_python_ok('-c', code) 1900 1901 def test_urandom_fd_closed(self): 1902 # Issue #21207: urandom() should reopen its fd to /dev/urandom if 1903 # closed. 1904 code = """if 1: 1905 import os 1906 import sys 1907 import test.support 1908 os.urandom(4) 1909 with test.support.SuppressCrashReport(): 1910 os.closerange(3, 256) 1911 sys.stdout.buffer.write(os.urandom(4)) 1912 """ 1913 rc, out, err = assert_python_ok('-Sc', code) 1914 1915 def test_urandom_fd_reopened(self): 1916 # Issue #21207: urandom() should detect its fd to /dev/urandom 1917 # changed to something else, and reopen it. 1918 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1919 create_file(os_helper.TESTFN, b"x" * 256) 1920 1921 code = """if 1: 1922 import os 1923 import sys 1924 import test.support 1925 os.urandom(4) 1926 with test.support.SuppressCrashReport(): 1927 for fd in range(3, 256): 1928 try: 1929 os.close(fd) 1930 except OSError: 1931 pass 1932 else: 1933 # Found the urandom fd (XXX hopefully) 1934 break 1935 os.closerange(3, 256) 1936 with open({TESTFN!r}, 'rb') as f: 1937 new_fd = f.fileno() 1938 # Issue #26935: posix allows new_fd and fd to be equal but 1939 # some libc implementations have dup2 return an error in this 1940 # case. 1941 if new_fd != fd: 1942 os.dup2(new_fd, fd) 1943 sys.stdout.buffer.write(os.urandom(4)) 1944 sys.stdout.buffer.write(os.urandom(4)) 1945 """.format(TESTFN=os_helper.TESTFN) 1946 rc, out, err = assert_python_ok('-Sc', code) 1947 self.assertEqual(len(out), 8) 1948 self.assertNotEqual(out[0:4], out[4:8]) 1949 rc, out2, err2 = assert_python_ok('-Sc', code) 1950 self.assertEqual(len(out2), 8) 1951 self.assertNotEqual(out2, out) 1952 1953 1954@contextlib.contextmanager 1955def _execvpe_mockup(defpath=None): 1956 """ 1957 Stubs out execv and execve functions when used as context manager. 1958 Records exec calls. The mock execv and execve functions always raise an 1959 exception as they would normally never return. 1960 """ 1961 # A list of tuples containing (function name, first arg, args) 1962 # of calls to execv or execve that have been made. 1963 calls = [] 1964 1965 def mock_execv(name, *args): 1966 calls.append(('execv', name, args)) 1967 raise RuntimeError("execv called") 1968 1969 def mock_execve(name, *args): 1970 calls.append(('execve', name, args)) 1971 raise OSError(errno.ENOTDIR, "execve called") 1972 1973 try: 1974 orig_execv = os.execv 1975 orig_execve = os.execve 1976 orig_defpath = os.defpath 1977 os.execv = mock_execv 1978 os.execve = mock_execve 1979 if defpath is not None: 1980 os.defpath = defpath 1981 yield calls 1982 finally: 1983 os.execv = orig_execv 1984 os.execve = orig_execve 1985 os.defpath = orig_defpath 1986 1987@unittest.skipUnless(hasattr(os, 'execv'), 1988 "need os.execv()") 1989class ExecTests(unittest.TestCase): 1990 @unittest.skipIf(USING_LINUXTHREADS, 1991 "avoid triggering a linuxthreads bug: see issue #4970") 1992 def test_execvpe_with_bad_program(self): 1993 self.assertRaises(OSError, os.execvpe, 'no such app-', 1994 ['no such app-'], None) 1995 1996 def test_execv_with_bad_arglist(self): 1997 self.assertRaises(ValueError, os.execv, 'notepad', ()) 1998 self.assertRaises(ValueError, os.execv, 'notepad', []) 1999 self.assertRaises(ValueError, os.execv, 'notepad', ('',)) 2000 self.assertRaises(ValueError, os.execv, 'notepad', ['']) 2001 2002 def test_execvpe_with_bad_arglist(self): 2003 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None) 2004 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {}) 2005 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {}) 2006 2007 @unittest.skipUnless(hasattr(os, '_execvpe'), 2008 "No internal os._execvpe function to test.") 2009 def _test_internal_execvpe(self, test_type): 2010 program_path = os.sep + 'absolutepath' 2011 if test_type is bytes: 2012 program = b'executable' 2013 fullpath = os.path.join(os.fsencode(program_path), program) 2014 native_fullpath = fullpath 2015 arguments = [b'progname', 'arg1', 'arg2'] 2016 else: 2017 program = 'executable' 2018 arguments = ['progname', 'arg1', 'arg2'] 2019 fullpath = os.path.join(program_path, program) 2020 if os.name != "nt": 2021 native_fullpath = os.fsencode(fullpath) 2022 else: 2023 native_fullpath = fullpath 2024 env = {'spam': 'beans'} 2025 2026 # test os._execvpe() with an absolute path 2027 with _execvpe_mockup() as calls: 2028 self.assertRaises(RuntimeError, 2029 os._execvpe, fullpath, arguments) 2030 self.assertEqual(len(calls), 1) 2031 self.assertEqual(calls[0], ('execv', fullpath, (arguments,))) 2032 2033 # test os._execvpe() with a relative path: 2034 # os.get_exec_path() returns defpath 2035 with _execvpe_mockup(defpath=program_path) as calls: 2036 self.assertRaises(OSError, 2037 os._execvpe, program, arguments, env=env) 2038 self.assertEqual(len(calls), 1) 2039 self.assertSequenceEqual(calls[0], 2040 ('execve', native_fullpath, (arguments, env))) 2041 2042 # test os._execvpe() with a relative path: 2043 # os.get_exec_path() reads the 'PATH' variable 2044 with _execvpe_mockup() as calls: 2045 env_path = env.copy() 2046 if test_type is bytes: 2047 env_path[b'PATH'] = program_path 2048 else: 2049 env_path['PATH'] = program_path 2050 self.assertRaises(OSError, 2051 os._execvpe, program, arguments, env=env_path) 2052 self.assertEqual(len(calls), 1) 2053 self.assertSequenceEqual(calls[0], 2054 ('execve', native_fullpath, (arguments, env_path))) 2055 2056 def test_internal_execvpe_str(self): 2057 self._test_internal_execvpe(str) 2058 if os.name != "nt": 2059 self._test_internal_execvpe(bytes) 2060 2061 def test_execve_invalid_env(self): 2062 args = [sys.executable, '-c', 'pass'] 2063 2064 # null character in the environment variable name 2065 newenv = os.environ.copy() 2066 newenv["FRUIT\0VEGETABLE"] = "cabbage" 2067 with self.assertRaises(ValueError): 2068 os.execve(args[0], args, newenv) 2069 2070 # null character in the environment variable value 2071 newenv = os.environ.copy() 2072 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 2073 with self.assertRaises(ValueError): 2074 os.execve(args[0], args, newenv) 2075 2076 # equal character in the environment variable name 2077 newenv = os.environ.copy() 2078 newenv["FRUIT=ORANGE"] = "lemon" 2079 with self.assertRaises(ValueError): 2080 os.execve(args[0], args, newenv) 2081 2082 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test") 2083 def test_execve_with_empty_path(self): 2084 # bpo-32890: Check GetLastError() misuse 2085 try: 2086 os.execve('', ['arg'], {}) 2087 except OSError as e: 2088 self.assertTrue(e.winerror is None or e.winerror != 0) 2089 else: 2090 self.fail('No OSError raised') 2091 2092 2093@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2094class Win32ErrorTests(unittest.TestCase): 2095 def setUp(self): 2096 try: 2097 os.stat(os_helper.TESTFN) 2098 except FileNotFoundError: 2099 exists = False 2100 except OSError as exc: 2101 exists = True 2102 self.fail("file %s must not exist; os.stat failed with %s" 2103 % (os_helper.TESTFN, exc)) 2104 else: 2105 self.fail("file %s must not exist" % os_helper.TESTFN) 2106 2107 def test_rename(self): 2108 self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak") 2109 2110 def test_remove(self): 2111 self.assertRaises(OSError, os.remove, os_helper.TESTFN) 2112 2113 def test_chdir(self): 2114 self.assertRaises(OSError, os.chdir, os_helper.TESTFN) 2115 2116 def test_mkdir(self): 2117 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 2118 2119 with open(os_helper.TESTFN, "x") as f: 2120 self.assertRaises(OSError, os.mkdir, os_helper.TESTFN) 2121 2122 def test_utime(self): 2123 self.assertRaises(OSError, os.utime, os_helper.TESTFN, None) 2124 2125 def test_chmod(self): 2126 self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0) 2127 2128 2129@unittest.skipIf(support.is_wasi, "Cannot create invalid FD on WASI.") 2130class TestInvalidFD(unittest.TestCase): 2131 singles = ["fchdir", "dup", "fdatasync", "fstat", 2132 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] 2133 #singles.append("close") 2134 #We omit close because it doesn't raise an exception on some platforms 2135 def get_single(f): 2136 def helper(self): 2137 if hasattr(os, f): 2138 self.check(getattr(os, f)) 2139 return helper 2140 for f in singles: 2141 locals()["test_"+f] = get_single(f) 2142 2143 def check(self, f, *args, **kwargs): 2144 try: 2145 f(os_helper.make_bad_fd(), *args, **kwargs) 2146 except OSError as e: 2147 self.assertEqual(e.errno, errno.EBADF) 2148 else: 2149 self.fail("%r didn't raise an OSError with a bad file descriptor" 2150 % f) 2151 2152 def test_fdopen(self): 2153 self.check(os.fdopen, encoding="utf-8") 2154 2155 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') 2156 def test_isatty(self): 2157 self.assertEqual(os.isatty(os_helper.make_bad_fd()), False) 2158 2159 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()') 2160 def test_closerange(self): 2161 fd = os_helper.make_bad_fd() 2162 # Make sure none of the descriptors we are about to close are 2163 # currently valid (issue 6542). 2164 for i in range(10): 2165 try: os.fstat(fd+i) 2166 except OSError: 2167 pass 2168 else: 2169 break 2170 if i < 2: 2171 raise unittest.SkipTest( 2172 "Unable to acquire a range of invalid file descriptors") 2173 self.assertEqual(os.closerange(fd, fd + i-1), None) 2174 2175 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') 2176 def test_dup2(self): 2177 self.check(os.dup2, 20) 2178 2179 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') 2180 @unittest.skipIf( 2181 support.is_emscripten, 2182 "dup2() with negative fds is broken on Emscripten (see gh-102179)" 2183 ) 2184 def test_dup2_negative_fd(self): 2185 valid_fd = os.open(__file__, os.O_RDONLY) 2186 self.addCleanup(os.close, valid_fd) 2187 fds = [ 2188 valid_fd, 2189 -1, 2190 -2**31, 2191 ] 2192 for fd, fd2 in itertools.product(fds, repeat=2): 2193 if fd != fd2: 2194 with self.subTest(fd=fd, fd2=fd2): 2195 with self.assertRaises(OSError) as ctx: 2196 os.dup2(fd, fd2) 2197 self.assertEqual(ctx.exception.errno, errno.EBADF) 2198 2199 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()') 2200 def test_fchmod(self): 2201 self.check(os.fchmod, 0) 2202 2203 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()') 2204 def test_fchown(self): 2205 self.check(os.fchown, -1, -1) 2206 2207 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()') 2208 @unittest.skipIf( 2209 support.is_emscripten or support.is_wasi, 2210 "musl libc issue on Emscripten/WASI, bpo-46390" 2211 ) 2212 def test_fpathconf(self): 2213 self.check(os.pathconf, "PC_NAME_MAX") 2214 self.check(os.fpathconf, "PC_NAME_MAX") 2215 2216 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()') 2217 def test_ftruncate(self): 2218 self.check(os.truncate, 0) 2219 self.check(os.ftruncate, 0) 2220 2221 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()') 2222 def test_lseek(self): 2223 self.check(os.lseek, 0, 0) 2224 2225 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()') 2226 def test_read(self): 2227 self.check(os.read, 1) 2228 2229 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()') 2230 def test_readv(self): 2231 buf = bytearray(10) 2232 self.check(os.readv, [buf]) 2233 2234 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()') 2235 def test_tcsetpgrpt(self): 2236 self.check(os.tcsetpgrp, 0) 2237 2238 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') 2239 def test_write(self): 2240 self.check(os.write, b" ") 2241 2242 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()') 2243 def test_writev(self): 2244 self.check(os.writev, [b'abc']) 2245 2246 @support.requires_subprocess() 2247 def test_inheritable(self): 2248 self.check(os.get_inheritable) 2249 self.check(os.set_inheritable, True) 2250 2251 @unittest.skipUnless(hasattr(os, 'get_blocking'), 2252 'needs os.get_blocking() and os.set_blocking()') 2253 def test_blocking(self): 2254 self.check(os.get_blocking) 2255 self.check(os.set_blocking, True) 2256 2257 2258@unittest.skipUnless(hasattr(os, 'link'), 'requires os.link') 2259class LinkTests(unittest.TestCase): 2260 def setUp(self): 2261 self.file1 = os_helper.TESTFN 2262 self.file2 = os.path.join(os_helper.TESTFN + "2") 2263 2264 def tearDown(self): 2265 for file in (self.file1, self.file2): 2266 if os.path.exists(file): 2267 os.unlink(file) 2268 2269 def _test_link(self, file1, file2): 2270 create_file(file1) 2271 2272 try: 2273 os.link(file1, file2) 2274 except PermissionError as e: 2275 self.skipTest('os.link(): %s' % e) 2276 with open(file1, "rb") as f1, open(file2, "rb") as f2: 2277 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno())) 2278 2279 def test_link(self): 2280 self._test_link(self.file1, self.file2) 2281 2282 def test_link_bytes(self): 2283 self._test_link(bytes(self.file1, sys.getfilesystemencoding()), 2284 bytes(self.file2, sys.getfilesystemencoding())) 2285 2286 def test_unicode_name(self): 2287 try: 2288 os.fsencode("\xf1") 2289 except UnicodeError: 2290 raise unittest.SkipTest("Unable to encode for this platform.") 2291 2292 self.file1 += "\xf1" 2293 self.file2 = self.file1 + "2" 2294 self._test_link(self.file1, self.file2) 2295 2296@unittest.skipIf(sys.platform == "win32", "Posix specific tests") 2297class PosixUidGidTests(unittest.TestCase): 2298 # uid_t and gid_t are 32-bit unsigned integers on Linux 2299 UID_OVERFLOW = (1 << 32) 2300 GID_OVERFLOW = (1 << 32) 2301 2302 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') 2303 def test_setuid(self): 2304 if os.getuid() != 0: 2305 self.assertRaises(OSError, os.setuid, 0) 2306 self.assertRaises(TypeError, os.setuid, 'not an int') 2307 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW) 2308 2309 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') 2310 def test_setgid(self): 2311 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2312 self.assertRaises(OSError, os.setgid, 0) 2313 self.assertRaises(TypeError, os.setgid, 'not an int') 2314 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW) 2315 2316 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') 2317 def test_seteuid(self): 2318 if os.getuid() != 0: 2319 self.assertRaises(OSError, os.seteuid, 0) 2320 self.assertRaises(TypeError, os.setegid, 'not an int') 2321 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW) 2322 2323 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') 2324 def test_setegid(self): 2325 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2326 self.assertRaises(OSError, os.setegid, 0) 2327 self.assertRaises(TypeError, os.setegid, 'not an int') 2328 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW) 2329 2330 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 2331 def test_setreuid(self): 2332 if os.getuid() != 0: 2333 self.assertRaises(OSError, os.setreuid, 0, 0) 2334 self.assertRaises(TypeError, os.setreuid, 'not an int', 0) 2335 self.assertRaises(TypeError, os.setreuid, 0, 'not an int') 2336 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0) 2337 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW) 2338 2339 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 2340 @support.requires_subprocess() 2341 def test_setreuid_neg1(self): 2342 # Needs to accept -1. We run this in a subprocess to avoid 2343 # altering the test runner's process state (issue8045). 2344 subprocess.check_call([ 2345 sys.executable, '-c', 2346 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) 2347 2348 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 2349 @support.requires_subprocess() 2350 def test_setregid(self): 2351 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2352 self.assertRaises(OSError, os.setregid, 0, 0) 2353 self.assertRaises(TypeError, os.setregid, 'not an int', 0) 2354 self.assertRaises(TypeError, os.setregid, 0, 'not an int') 2355 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0) 2356 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW) 2357 2358 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 2359 @support.requires_subprocess() 2360 def test_setregid_neg1(self): 2361 # Needs to accept -1. We run this in a subprocess to avoid 2362 # altering the test runner's process state (issue8045). 2363 subprocess.check_call([ 2364 sys.executable, '-c', 2365 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) 2366 2367@unittest.skipIf(sys.platform == "win32", "Posix specific tests") 2368class Pep383Tests(unittest.TestCase): 2369 def setUp(self): 2370 if os_helper.TESTFN_UNENCODABLE: 2371 self.dir = os_helper.TESTFN_UNENCODABLE 2372 elif os_helper.TESTFN_NONASCII: 2373 self.dir = os_helper.TESTFN_NONASCII 2374 else: 2375 self.dir = os_helper.TESTFN 2376 self.bdir = os.fsencode(self.dir) 2377 2378 bytesfn = [] 2379 def add_filename(fn): 2380 try: 2381 fn = os.fsencode(fn) 2382 except UnicodeEncodeError: 2383 return 2384 bytesfn.append(fn) 2385 add_filename(os_helper.TESTFN_UNICODE) 2386 if os_helper.TESTFN_UNENCODABLE: 2387 add_filename(os_helper.TESTFN_UNENCODABLE) 2388 if os_helper.TESTFN_NONASCII: 2389 add_filename(os_helper.TESTFN_NONASCII) 2390 if not bytesfn: 2391 self.skipTest("couldn't create any non-ascii filename") 2392 2393 self.unicodefn = set() 2394 os.mkdir(self.dir) 2395 try: 2396 for fn in bytesfn: 2397 os_helper.create_empty_file(os.path.join(self.bdir, fn)) 2398 fn = os.fsdecode(fn) 2399 if fn in self.unicodefn: 2400 raise ValueError("duplicate filename") 2401 self.unicodefn.add(fn) 2402 except: 2403 shutil.rmtree(self.dir) 2404 raise 2405 2406 def tearDown(self): 2407 shutil.rmtree(self.dir) 2408 2409 def test_listdir(self): 2410 expected = self.unicodefn 2411 found = set(os.listdir(self.dir)) 2412 self.assertEqual(found, expected) 2413 # test listdir without arguments 2414 current_directory = os.getcwd() 2415 try: 2416 os.chdir(os.sep) 2417 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) 2418 finally: 2419 os.chdir(current_directory) 2420 2421 def test_open(self): 2422 for fn in self.unicodefn: 2423 f = open(os.path.join(self.dir, fn), 'rb') 2424 f.close() 2425 2426 @unittest.skipUnless(hasattr(os, 'statvfs'), 2427 "need os.statvfs()") 2428 def test_statvfs(self): 2429 # issue #9645 2430 for fn in self.unicodefn: 2431 # should not fail with file not found error 2432 fullname = os.path.join(self.dir, fn) 2433 os.statvfs(fullname) 2434 2435 def test_stat(self): 2436 for fn in self.unicodefn: 2437 os.stat(os.path.join(self.dir, fn)) 2438 2439@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2440class Win32KillTests(unittest.TestCase): 2441 def _kill(self, sig): 2442 # Start sys.executable as a subprocess and communicate from the 2443 # subprocess to the parent that the interpreter is ready. When it 2444 # becomes ready, send *sig* via os.kill to the subprocess and check 2445 # that the return code is equal to *sig*. 2446 import ctypes 2447 from ctypes import wintypes 2448 import msvcrt 2449 2450 # Since we can't access the contents of the process' stdout until the 2451 # process has exited, use PeekNamedPipe to see what's inside stdout 2452 # without waiting. This is done so we can tell that the interpreter 2453 # is started and running at a point where it could handle a signal. 2454 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe 2455 PeekNamedPipe.restype = wintypes.BOOL 2456 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle 2457 ctypes.POINTER(ctypes.c_char), # stdout buf 2458 wintypes.DWORD, # Buffer size 2459 ctypes.POINTER(wintypes.DWORD), # bytes read 2460 ctypes.POINTER(wintypes.DWORD), # bytes avail 2461 ctypes.POINTER(wintypes.DWORD)) # bytes left 2462 msg = "running" 2463 proc = subprocess.Popen([sys.executable, "-c", 2464 "import sys;" 2465 "sys.stdout.write('{}');" 2466 "sys.stdout.flush();" 2467 "input()".format(msg)], 2468 stdout=subprocess.PIPE, 2469 stderr=subprocess.PIPE, 2470 stdin=subprocess.PIPE) 2471 self.addCleanup(proc.stdout.close) 2472 self.addCleanup(proc.stderr.close) 2473 self.addCleanup(proc.stdin.close) 2474 2475 count, max = 0, 100 2476 while count < max and proc.poll() is None: 2477 # Create a string buffer to store the result of stdout from the pipe 2478 buf = ctypes.create_string_buffer(len(msg)) 2479 # Obtain the text currently in proc.stdout 2480 # Bytes read/avail/left are left as NULL and unused 2481 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), 2482 buf, ctypes.sizeof(buf), None, None, None) 2483 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") 2484 if buf.value: 2485 self.assertEqual(msg, buf.value.decode()) 2486 break 2487 time.sleep(0.1) 2488 count += 1 2489 else: 2490 self.fail("Did not receive communication from the subprocess") 2491 2492 os.kill(proc.pid, sig) 2493 self.assertEqual(proc.wait(), sig) 2494 2495 def test_kill_sigterm(self): 2496 # SIGTERM doesn't mean anything special, but make sure it works 2497 self._kill(signal.SIGTERM) 2498 2499 def test_kill_int(self): 2500 # os.kill on Windows can take an int which gets set as the exit code 2501 self._kill(100) 2502 2503 @unittest.skipIf(mmap is None, "requires mmap") 2504 def _kill_with_event(self, event, name): 2505 tagname = "test_os_%s" % uuid.uuid1() 2506 m = mmap.mmap(-1, 1, tagname) 2507 m[0] = 0 2508 # Run a script which has console control handling enabled. 2509 proc = subprocess.Popen([sys.executable, 2510 os.path.join(os.path.dirname(__file__), 2511 "win_console_handler.py"), tagname], 2512 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) 2513 # Let the interpreter startup before we send signals. See #3137. 2514 count, max = 0, 100 2515 while count < max and proc.poll() is None: 2516 if m[0] == 1: 2517 break 2518 time.sleep(0.1) 2519 count += 1 2520 else: 2521 # Forcefully kill the process if we weren't able to signal it. 2522 os.kill(proc.pid, signal.SIGINT) 2523 self.fail("Subprocess didn't finish initialization") 2524 os.kill(proc.pid, event) 2525 # proc.send_signal(event) could also be done here. 2526 # Allow time for the signal to be passed and the process to exit. 2527 time.sleep(0.5) 2528 if not proc.poll(): 2529 # Forcefully kill the process if we weren't able to signal it. 2530 os.kill(proc.pid, signal.SIGINT) 2531 self.fail("subprocess did not stop on {}".format(name)) 2532 2533 @unittest.skip("subprocesses aren't inheriting Ctrl+C property") 2534 @support.requires_subprocess() 2535 def test_CTRL_C_EVENT(self): 2536 from ctypes import wintypes 2537 import ctypes 2538 2539 # Make a NULL value by creating a pointer with no argument. 2540 NULL = ctypes.POINTER(ctypes.c_int)() 2541 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler 2542 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), 2543 wintypes.BOOL) 2544 SetConsoleCtrlHandler.restype = wintypes.BOOL 2545 2546 # Calling this with NULL and FALSE causes the calling process to 2547 # handle Ctrl+C, rather than ignore it. This property is inherited 2548 # by subprocesses. 2549 SetConsoleCtrlHandler(NULL, 0) 2550 2551 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") 2552 2553 @support.requires_subprocess() 2554 def test_CTRL_BREAK_EVENT(self): 2555 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") 2556 2557 2558@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2559class Win32ListdirTests(unittest.TestCase): 2560 """Test listdir on Windows.""" 2561 2562 def setUp(self): 2563 self.created_paths = [] 2564 for i in range(2): 2565 dir_name = 'SUB%d' % i 2566 dir_path = os.path.join(os_helper.TESTFN, dir_name) 2567 file_name = 'FILE%d' % i 2568 file_path = os.path.join(os_helper.TESTFN, file_name) 2569 os.makedirs(dir_path) 2570 with open(file_path, 'w', encoding='utf-8') as f: 2571 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) 2572 self.created_paths.extend([dir_name, file_name]) 2573 self.created_paths.sort() 2574 2575 def tearDown(self): 2576 shutil.rmtree(os_helper.TESTFN) 2577 2578 def test_listdir_no_extended_path(self): 2579 """Test when the path is not an "extended" path.""" 2580 # unicode 2581 self.assertEqual( 2582 sorted(os.listdir(os_helper.TESTFN)), 2583 self.created_paths) 2584 2585 # bytes 2586 self.assertEqual( 2587 sorted(os.listdir(os.fsencode(os_helper.TESTFN))), 2588 [os.fsencode(path) for path in self.created_paths]) 2589 2590 def test_listdir_extended_path(self): 2591 """Test when the path starts with '\\\\?\\'.""" 2592 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath 2593 # unicode 2594 path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN) 2595 self.assertEqual( 2596 sorted(os.listdir(path)), 2597 self.created_paths) 2598 2599 # bytes 2600 path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN)) 2601 self.assertEqual( 2602 sorted(os.listdir(path)), 2603 [os.fsencode(path) for path in self.created_paths]) 2604 2605 2606@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()') 2607class ReadlinkTests(unittest.TestCase): 2608 filelink = 'readlinktest' 2609 filelink_target = os.path.abspath(__file__) 2610 filelinkb = os.fsencode(filelink) 2611 filelinkb_target = os.fsencode(filelink_target) 2612 2613 def assertPathEqual(self, left, right): 2614 left = os.path.normcase(left) 2615 right = os.path.normcase(right) 2616 if sys.platform == 'win32': 2617 # Bad practice to blindly strip the prefix as it may be required to 2618 # correctly refer to the file, but we're only comparing paths here. 2619 has_prefix = lambda p: p.startswith( 2620 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\') 2621 if has_prefix(left): 2622 left = left[4:] 2623 if has_prefix(right): 2624 right = right[4:] 2625 self.assertEqual(left, right) 2626 2627 def setUp(self): 2628 self.assertTrue(os.path.exists(self.filelink_target)) 2629 self.assertTrue(os.path.exists(self.filelinkb_target)) 2630 self.assertFalse(os.path.exists(self.filelink)) 2631 self.assertFalse(os.path.exists(self.filelinkb)) 2632 2633 def test_not_symlink(self): 2634 filelink_target = FakePath(self.filelink_target) 2635 self.assertRaises(OSError, os.readlink, self.filelink_target) 2636 self.assertRaises(OSError, os.readlink, filelink_target) 2637 2638 def test_missing_link(self): 2639 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link') 2640 self.assertRaises(FileNotFoundError, os.readlink, 2641 FakePath('missing-link')) 2642 2643 @os_helper.skip_unless_symlink 2644 def test_pathlike(self): 2645 os.symlink(self.filelink_target, self.filelink) 2646 self.addCleanup(os_helper.unlink, self.filelink) 2647 filelink = FakePath(self.filelink) 2648 self.assertPathEqual(os.readlink(filelink), self.filelink_target) 2649 2650 @os_helper.skip_unless_symlink 2651 def test_pathlike_bytes(self): 2652 os.symlink(self.filelinkb_target, self.filelinkb) 2653 self.addCleanup(os_helper.unlink, self.filelinkb) 2654 path = os.readlink(FakePath(self.filelinkb)) 2655 self.assertPathEqual(path, self.filelinkb_target) 2656 self.assertIsInstance(path, bytes) 2657 2658 @os_helper.skip_unless_symlink 2659 def test_bytes(self): 2660 os.symlink(self.filelinkb_target, self.filelinkb) 2661 self.addCleanup(os_helper.unlink, self.filelinkb) 2662 path = os.readlink(self.filelinkb) 2663 self.assertPathEqual(path, self.filelinkb_target) 2664 self.assertIsInstance(path, bytes) 2665 2666 2667@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2668@os_helper.skip_unless_symlink 2669class Win32SymlinkTests(unittest.TestCase): 2670 filelink = 'filelinktest' 2671 filelink_target = os.path.abspath(__file__) 2672 dirlink = 'dirlinktest' 2673 dirlink_target = os.path.dirname(filelink_target) 2674 missing_link = 'missing link' 2675 2676 def setUp(self): 2677 assert os.path.exists(self.dirlink_target) 2678 assert os.path.exists(self.filelink_target) 2679 assert not os.path.exists(self.dirlink) 2680 assert not os.path.exists(self.filelink) 2681 assert not os.path.exists(self.missing_link) 2682 2683 def tearDown(self): 2684 if os.path.exists(self.filelink): 2685 os.remove(self.filelink) 2686 if os.path.exists(self.dirlink): 2687 os.rmdir(self.dirlink) 2688 if os.path.lexists(self.missing_link): 2689 os.remove(self.missing_link) 2690 2691 def test_directory_link(self): 2692 os.symlink(self.dirlink_target, self.dirlink) 2693 self.assertTrue(os.path.exists(self.dirlink)) 2694 self.assertTrue(os.path.isdir(self.dirlink)) 2695 self.assertTrue(os.path.islink(self.dirlink)) 2696 self.check_stat(self.dirlink, self.dirlink_target) 2697 2698 def test_file_link(self): 2699 os.symlink(self.filelink_target, self.filelink) 2700 self.assertTrue(os.path.exists(self.filelink)) 2701 self.assertTrue(os.path.isfile(self.filelink)) 2702 self.assertTrue(os.path.islink(self.filelink)) 2703 self.check_stat(self.filelink, self.filelink_target) 2704 2705 def _create_missing_dir_link(self): 2706 'Create a "directory" link to a non-existent target' 2707 linkname = self.missing_link 2708 if os.path.lexists(linkname): 2709 os.remove(linkname) 2710 target = r'c:\\target does not exist.29r3c740' 2711 assert not os.path.exists(target) 2712 target_is_dir = True 2713 os.symlink(target, linkname, target_is_dir) 2714 2715 def test_remove_directory_link_to_missing_target(self): 2716 self._create_missing_dir_link() 2717 # For compatibility with Unix, os.remove will check the 2718 # directory status and call RemoveDirectory if the symlink 2719 # was created with target_is_dir==True. 2720 os.remove(self.missing_link) 2721 2722 def test_isdir_on_directory_link_to_missing_target(self): 2723 self._create_missing_dir_link() 2724 self.assertFalse(os.path.isdir(self.missing_link)) 2725 2726 def test_rmdir_on_directory_link_to_missing_target(self): 2727 self._create_missing_dir_link() 2728 os.rmdir(self.missing_link) 2729 2730 def check_stat(self, link, target): 2731 self.assertEqual(os.stat(link), os.stat(target)) 2732 self.assertNotEqual(os.lstat(link), os.stat(link)) 2733 2734 bytes_link = os.fsencode(link) 2735 self.assertEqual(os.stat(bytes_link), os.stat(target)) 2736 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) 2737 2738 def test_12084(self): 2739 level1 = os.path.abspath(os_helper.TESTFN) 2740 level2 = os.path.join(level1, "level2") 2741 level3 = os.path.join(level2, "level3") 2742 self.addCleanup(os_helper.rmtree, level1) 2743 2744 os.mkdir(level1) 2745 os.mkdir(level2) 2746 os.mkdir(level3) 2747 2748 file1 = os.path.abspath(os.path.join(level1, "file1")) 2749 create_file(file1) 2750 2751 orig_dir = os.getcwd() 2752 try: 2753 os.chdir(level2) 2754 link = os.path.join(level2, "link") 2755 os.symlink(os.path.relpath(file1), "link") 2756 self.assertIn("link", os.listdir(os.getcwd())) 2757 2758 # Check os.stat calls from the same dir as the link 2759 self.assertEqual(os.stat(file1), os.stat("link")) 2760 2761 # Check os.stat calls from a dir below the link 2762 os.chdir(level1) 2763 self.assertEqual(os.stat(file1), 2764 os.stat(os.path.relpath(link))) 2765 2766 # Check os.stat calls from a dir above the link 2767 os.chdir(level3) 2768 self.assertEqual(os.stat(file1), 2769 os.stat(os.path.relpath(link))) 2770 finally: 2771 os.chdir(orig_dir) 2772 2773 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users') 2774 and os.path.exists(r'C:\ProgramData'), 2775 'Test directories not found') 2776 def test_29248(self): 2777 # os.symlink() calls CreateSymbolicLink, which creates 2778 # the reparse data buffer with the print name stored 2779 # first, so the offset is always 0. CreateSymbolicLink 2780 # stores the "PrintName" DOS path (e.g. "C:\") first, 2781 # with an offset of 0, followed by the "SubstituteName" 2782 # NT path (e.g. "\??\C:\"). The "All Users" link, on 2783 # the other hand, seems to have been created manually 2784 # with an inverted order. 2785 target = os.readlink(r'C:\Users\All Users') 2786 self.assertTrue(os.path.samefile(target, r'C:\ProgramData')) 2787 2788 def test_buffer_overflow(self): 2789 # Older versions would have a buffer overflow when detecting 2790 # whether a link source was a directory. This test ensures we 2791 # no longer crash, but does not otherwise validate the behavior 2792 segment = 'X' * 27 2793 path = os.path.join(*[segment] * 10) 2794 test_cases = [ 2795 # overflow with absolute src 2796 ('\\' + path, segment), 2797 # overflow dest with relative src 2798 (segment, path), 2799 # overflow when joining src 2800 (path[:180], path[:180]), 2801 ] 2802 for src, dest in test_cases: 2803 try: 2804 os.symlink(src, dest) 2805 except FileNotFoundError: 2806 pass 2807 else: 2808 try: 2809 os.remove(dest) 2810 except OSError: 2811 pass 2812 # Also test with bytes, since that is a separate code path. 2813 try: 2814 os.symlink(os.fsencode(src), os.fsencode(dest)) 2815 except FileNotFoundError: 2816 pass 2817 else: 2818 try: 2819 os.remove(dest) 2820 except OSError: 2821 pass 2822 2823 def test_appexeclink(self): 2824 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps') 2825 if not os.path.isdir(root): 2826 self.skipTest("test requires a WindowsApps directory") 2827 2828 aliases = [os.path.join(root, a) 2829 for a in fnmatch.filter(os.listdir(root), '*.exe')] 2830 2831 for alias in aliases: 2832 if support.verbose: 2833 print() 2834 print("Testing with", alias) 2835 st = os.lstat(alias) 2836 self.assertEqual(st, os.stat(alias)) 2837 self.assertFalse(stat.S_ISLNK(st.st_mode)) 2838 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK) 2839 # testing the first one we see is sufficient 2840 break 2841 else: 2842 self.skipTest("test requires an app execution alias") 2843 2844@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2845class Win32JunctionTests(unittest.TestCase): 2846 junction = 'junctiontest' 2847 junction_target = os.path.dirname(os.path.abspath(__file__)) 2848 2849 def setUp(self): 2850 assert os.path.exists(self.junction_target) 2851 assert not os.path.lexists(self.junction) 2852 2853 def tearDown(self): 2854 if os.path.lexists(self.junction): 2855 os.unlink(self.junction) 2856 2857 def test_create_junction(self): 2858 _winapi.CreateJunction(self.junction_target, self.junction) 2859 self.assertTrue(os.path.lexists(self.junction)) 2860 self.assertTrue(os.path.exists(self.junction)) 2861 self.assertTrue(os.path.isdir(self.junction)) 2862 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction)) 2863 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target)) 2864 2865 # bpo-37834: Junctions are not recognized as links. 2866 self.assertFalse(os.path.islink(self.junction)) 2867 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target), 2868 os.path.normcase(os.readlink(self.junction))) 2869 2870 def test_unlink_removes_junction(self): 2871 _winapi.CreateJunction(self.junction_target, self.junction) 2872 self.assertTrue(os.path.exists(self.junction)) 2873 self.assertTrue(os.path.lexists(self.junction)) 2874 2875 os.unlink(self.junction) 2876 self.assertFalse(os.path.exists(self.junction)) 2877 2878@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2879class Win32NtTests(unittest.TestCase): 2880 def test_getfinalpathname_handles(self): 2881 nt = import_helper.import_module('nt') 2882 ctypes = import_helper.import_module('ctypes') 2883 import ctypes.wintypes 2884 2885 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True) 2886 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE 2887 2888 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL 2889 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE, 2890 ctypes.wintypes.LPDWORD) 2891 2892 # This is a pseudo-handle that doesn't need to be closed 2893 hproc = kernel.GetCurrentProcess() 2894 2895 handle_count = ctypes.wintypes.DWORD() 2896 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) 2897 self.assertEqual(1, ok) 2898 2899 before_count = handle_count.value 2900 2901 # The first two test the error path, __file__ tests the success path 2902 filenames = [ 2903 r'\\?\C:', 2904 r'\\?\NUL', 2905 r'\\?\CONIN', 2906 __file__, 2907 ] 2908 2909 for _ in range(10): 2910 for name in filenames: 2911 try: 2912 nt._getfinalpathname(name) 2913 except Exception: 2914 # Failure is expected 2915 pass 2916 try: 2917 os.stat(name) 2918 except Exception: 2919 pass 2920 2921 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) 2922 self.assertEqual(1, ok) 2923 2924 handle_delta = handle_count.value - before_count 2925 2926 self.assertEqual(0, handle_delta) 2927 2928 @support.requires_subprocess() 2929 def test_stat_unlink_race(self): 2930 # bpo-46785: the implementation of os.stat() falls back to reading 2931 # the parent directory if CreateFileW() fails with a permission 2932 # error. If reading the parent directory fails because the file or 2933 # directory are subsequently unlinked, or because the volume or 2934 # share are no longer available, then the original permission error 2935 # should not be restored. 2936 filename = os_helper.TESTFN 2937 self.addCleanup(os_helper.unlink, filename) 2938 deadline = time.time() + 5 2939 command = textwrap.dedent("""\ 2940 import os 2941 import sys 2942 import time 2943 2944 filename = sys.argv[1] 2945 deadline = float(sys.argv[2]) 2946 2947 while time.time() < deadline: 2948 try: 2949 with open(filename, "w") as f: 2950 pass 2951 except OSError: 2952 pass 2953 try: 2954 os.remove(filename) 2955 except OSError: 2956 pass 2957 """) 2958 2959 with subprocess.Popen([sys.executable, '-c', command, filename, str(deadline)]) as proc: 2960 while time.time() < deadline: 2961 try: 2962 os.stat(filename) 2963 except FileNotFoundError as e: 2964 assert e.winerror == 2 # ERROR_FILE_NOT_FOUND 2965 try: 2966 proc.wait(1) 2967 except subprocess.TimeoutExpired: 2968 proc.terminate() 2969 2970 2971@os_helper.skip_unless_symlink 2972class NonLocalSymlinkTests(unittest.TestCase): 2973 2974 def setUp(self): 2975 r""" 2976 Create this structure: 2977 2978 base 2979 \___ some_dir 2980 """ 2981 os.makedirs('base/some_dir') 2982 2983 def tearDown(self): 2984 shutil.rmtree('base') 2985 2986 def test_directory_link_nonlocal(self): 2987 """ 2988 The symlink target should resolve relative to the link, not relative 2989 to the current directory. 2990 2991 Then, link base/some_link -> base/some_dir and ensure that some_link 2992 is resolved as a directory. 2993 2994 In issue13772, it was discovered that directory detection failed if 2995 the symlink target was not specified relative to the current 2996 directory, which was a defect in the implementation. 2997 """ 2998 src = os.path.join('base', 'some_link') 2999 os.symlink('some_dir', src) 3000 assert os.path.isdir(src) 3001 3002 3003class FSEncodingTests(unittest.TestCase): 3004 def test_nop(self): 3005 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff') 3006 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141') 3007 3008 def test_identity(self): 3009 # assert fsdecode(fsencode(x)) == x 3010 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'): 3011 try: 3012 bytesfn = os.fsencode(fn) 3013 except UnicodeEncodeError: 3014 continue 3015 self.assertEqual(os.fsdecode(bytesfn), fn) 3016 3017 3018 3019class DeviceEncodingTests(unittest.TestCase): 3020 3021 def test_bad_fd(self): 3022 # Return None when an fd doesn't actually exist. 3023 self.assertIsNone(os.device_encoding(123456)) 3024 3025 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or 3026 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))), 3027 'test requires a tty and either Windows or nl_langinfo(CODESET)') 3028 @unittest.skipIf( 3029 support.is_emscripten, "Cannot get encoding of stdin on Emscripten" 3030 ) 3031 def test_device_encoding(self): 3032 encoding = os.device_encoding(0) 3033 self.assertIsNotNone(encoding) 3034 self.assertTrue(codecs.lookup(encoding)) 3035 3036 3037@support.requires_subprocess() 3038class PidTests(unittest.TestCase): 3039 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid") 3040 def test_getppid(self): 3041 p = subprocess.Popen([sys.executable, '-c', 3042 'import os; print(os.getppid())'], 3043 stdout=subprocess.PIPE) 3044 stdout, _ = p.communicate() 3045 # We are the parent of our subprocess 3046 self.assertEqual(int(stdout), os.getpid()) 3047 3048 def check_waitpid(self, code, exitcode, callback=None): 3049 if sys.platform == 'win32': 3050 # On Windows, os.spawnv() simply joins arguments with spaces: 3051 # arguments need to be quoted 3052 args = [f'"{sys.executable}"', '-c', f'"{code}"'] 3053 else: 3054 args = [sys.executable, '-c', code] 3055 pid = os.spawnv(os.P_NOWAIT, sys.executable, args) 3056 3057 if callback is not None: 3058 callback(pid) 3059 3060 # don't use support.wait_process() to test directly os.waitpid() 3061 # and os.waitstatus_to_exitcode() 3062 pid2, status = os.waitpid(pid, 0) 3063 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode) 3064 self.assertEqual(pid2, pid) 3065 3066 def test_waitpid(self): 3067 self.check_waitpid(code='pass', exitcode=0) 3068 3069 def test_waitstatus_to_exitcode(self): 3070 exitcode = 23 3071 code = f'import sys; sys.exit({exitcode})' 3072 self.check_waitpid(code, exitcode=exitcode) 3073 3074 with self.assertRaises(TypeError): 3075 os.waitstatus_to_exitcode(0.0) 3076 3077 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 3078 def test_waitpid_windows(self): 3079 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode() 3080 # with exit code larger than INT_MAX. 3081 STATUS_CONTROL_C_EXIT = 0xC000013A 3082 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})' 3083 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT) 3084 3085 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 3086 def test_waitstatus_to_exitcode_windows(self): 3087 max_exitcode = 2 ** 32 - 1 3088 for exitcode in (0, 1, 5, max_exitcode): 3089 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8), 3090 exitcode) 3091 3092 # invalid values 3093 with self.assertRaises(ValueError): 3094 os.waitstatus_to_exitcode((max_exitcode + 1) << 8) 3095 with self.assertRaises(OverflowError): 3096 os.waitstatus_to_exitcode(-1) 3097 3098 # Skip the test on Windows 3099 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL') 3100 def test_waitstatus_to_exitcode_kill(self): 3101 code = f'import time; time.sleep({support.LONG_TIMEOUT})' 3102 signum = signal.SIGKILL 3103 3104 def kill_process(pid): 3105 os.kill(pid, signum) 3106 3107 self.check_waitpid(code, exitcode=-signum, callback=kill_process) 3108 3109 3110@support.requires_subprocess() 3111class SpawnTests(unittest.TestCase): 3112 @staticmethod 3113 def quote_args(args): 3114 # On Windows, os.spawn* simply joins arguments with spaces: 3115 # arguments need to be quoted 3116 if os.name != 'nt': 3117 return args 3118 return [f'"{arg}"' if " " in arg.strip() else arg for arg in args] 3119 3120 def create_args(self, *, with_env=False, use_bytes=False): 3121 self.exitcode = 17 3122 3123 filename = os_helper.TESTFN 3124 self.addCleanup(os_helper.unlink, filename) 3125 3126 if not with_env: 3127 code = 'import sys; sys.exit(%s)' % self.exitcode 3128 else: 3129 self.env = dict(os.environ) 3130 # create an unique key 3131 self.key = str(uuid.uuid4()) 3132 self.env[self.key] = self.key 3133 # read the variable from os.environ to check that it exists 3134 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)' 3135 % (self.key, self.exitcode)) 3136 3137 with open(filename, "w", encoding="utf-8") as fp: 3138 fp.write(code) 3139 3140 program = sys.executable 3141 args = self.quote_args([program, filename]) 3142 if use_bytes: 3143 program = os.fsencode(program) 3144 args = [os.fsencode(a) for a in args] 3145 self.env = {os.fsencode(k): os.fsencode(v) 3146 for k, v in self.env.items()} 3147 3148 return program, args 3149 3150 @requires_os_func('spawnl') 3151 def test_spawnl(self): 3152 program, args = self.create_args() 3153 exitcode = os.spawnl(os.P_WAIT, program, *args) 3154 self.assertEqual(exitcode, self.exitcode) 3155 3156 @requires_os_func('spawnle') 3157 def test_spawnle(self): 3158 program, args = self.create_args(with_env=True) 3159 exitcode = os.spawnle(os.P_WAIT, program, *args, self.env) 3160 self.assertEqual(exitcode, self.exitcode) 3161 3162 @requires_os_func('spawnlp') 3163 def test_spawnlp(self): 3164 program, args = self.create_args() 3165 exitcode = os.spawnlp(os.P_WAIT, program, *args) 3166 self.assertEqual(exitcode, self.exitcode) 3167 3168 @requires_os_func('spawnlpe') 3169 def test_spawnlpe(self): 3170 program, args = self.create_args(with_env=True) 3171 exitcode = os.spawnlpe(os.P_WAIT, program, *args, self.env) 3172 self.assertEqual(exitcode, self.exitcode) 3173 3174 @requires_os_func('spawnv') 3175 def test_spawnv(self): 3176 program, args = self.create_args() 3177 exitcode = os.spawnv(os.P_WAIT, program, args) 3178 self.assertEqual(exitcode, self.exitcode) 3179 3180 # Test for PyUnicode_FSConverter() 3181 exitcode = os.spawnv(os.P_WAIT, FakePath(program), args) 3182 self.assertEqual(exitcode, self.exitcode) 3183 3184 @requires_os_func('spawnve') 3185 def test_spawnve(self): 3186 program, args = self.create_args(with_env=True) 3187 exitcode = os.spawnve(os.P_WAIT, program, args, self.env) 3188 self.assertEqual(exitcode, self.exitcode) 3189 3190 @requires_os_func('spawnvp') 3191 def test_spawnvp(self): 3192 program, args = self.create_args() 3193 exitcode = os.spawnvp(os.P_WAIT, program, args) 3194 self.assertEqual(exitcode, self.exitcode) 3195 3196 @requires_os_func('spawnvpe') 3197 def test_spawnvpe(self): 3198 program, args = self.create_args(with_env=True) 3199 exitcode = os.spawnvpe(os.P_WAIT, program, args, self.env) 3200 self.assertEqual(exitcode, self.exitcode) 3201 3202 @requires_os_func('spawnv') 3203 def test_nowait(self): 3204 program, args = self.create_args() 3205 pid = os.spawnv(os.P_NOWAIT, program, args) 3206 support.wait_process(pid, exitcode=self.exitcode) 3207 3208 @requires_os_func('spawnve') 3209 def test_spawnve_bytes(self): 3210 # Test bytes handling in parse_arglist and parse_envlist (#28114) 3211 program, args = self.create_args(with_env=True, use_bytes=True) 3212 exitcode = os.spawnve(os.P_WAIT, program, args, self.env) 3213 self.assertEqual(exitcode, self.exitcode) 3214 3215 @requires_os_func('spawnl') 3216 def test_spawnl_noargs(self): 3217 program, __ = self.create_args() 3218 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, program) 3219 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, program, '') 3220 3221 @requires_os_func('spawnle') 3222 def test_spawnle_noargs(self): 3223 program, __ = self.create_args() 3224 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, program, {}) 3225 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, program, '', {}) 3226 3227 @requires_os_func('spawnv') 3228 def test_spawnv_noargs(self): 3229 program, __ = self.create_args() 3230 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, ()) 3231 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, []) 3232 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, ('',)) 3233 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, ['']) 3234 3235 @requires_os_func('spawnve') 3236 def test_spawnve_noargs(self): 3237 program, __ = self.create_args() 3238 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, (), {}) 3239 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, [], {}) 3240 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, ('',), {}) 3241 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, [''], {}) 3242 3243 def _test_invalid_env(self, spawn): 3244 program = sys.executable 3245 args = self.quote_args([program, '-c', 'pass']) 3246 3247 # null character in the environment variable name 3248 newenv = os.environ.copy() 3249 newenv["FRUIT\0VEGETABLE"] = "cabbage" 3250 try: 3251 exitcode = spawn(os.P_WAIT, program, args, newenv) 3252 except ValueError: 3253 pass 3254 else: 3255 self.assertEqual(exitcode, 127) 3256 3257 # null character in the environment variable value 3258 newenv = os.environ.copy() 3259 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 3260 try: 3261 exitcode = spawn(os.P_WAIT, program, args, newenv) 3262 except ValueError: 3263 pass 3264 else: 3265 self.assertEqual(exitcode, 127) 3266 3267 # equal character in the environment variable name 3268 newenv = os.environ.copy() 3269 newenv["FRUIT=ORANGE"] = "lemon" 3270 try: 3271 exitcode = spawn(os.P_WAIT, program, args, newenv) 3272 except ValueError: 3273 pass 3274 else: 3275 self.assertEqual(exitcode, 127) 3276 3277 # equal character in the environment variable value 3278 filename = os_helper.TESTFN 3279 self.addCleanup(os_helper.unlink, filename) 3280 with open(filename, "w", encoding="utf-8") as fp: 3281 fp.write('import sys, os\n' 3282 'if os.getenv("FRUIT") != "orange=lemon":\n' 3283 ' raise AssertionError') 3284 3285 args = self.quote_args([program, filename]) 3286 newenv = os.environ.copy() 3287 newenv["FRUIT"] = "orange=lemon" 3288 exitcode = spawn(os.P_WAIT, program, args, newenv) 3289 self.assertEqual(exitcode, 0) 3290 3291 @requires_os_func('spawnve') 3292 def test_spawnve_invalid_env(self): 3293 self._test_invalid_env(os.spawnve) 3294 3295 @requires_os_func('spawnvpe') 3296 def test_spawnvpe_invalid_env(self): 3297 self._test_invalid_env(os.spawnvpe) 3298 3299 3300# The introduction of this TestCase caused at least two different errors on 3301# *nix buildbots. Temporarily skip this to let the buildbots move along. 3302@unittest.skip("Skip due to platform/environment differences on *NIX buildbots") 3303@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin") 3304class LoginTests(unittest.TestCase): 3305 def test_getlogin(self): 3306 user_name = os.getlogin() 3307 self.assertNotEqual(len(user_name), 0) 3308 3309 3310@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'), 3311 "needs os.getpriority and os.setpriority") 3312class ProgramPriorityTests(unittest.TestCase): 3313 """Tests for os.getpriority() and os.setpriority().""" 3314 3315 def test_set_get_priority(self): 3316 3317 base = os.getpriority(os.PRIO_PROCESS, os.getpid()) 3318 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1) 3319 try: 3320 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid()) 3321 if base >= 19 and new_prio <= 19: 3322 raise unittest.SkipTest("unable to reliably test setpriority " 3323 "at current nice level of %s" % base) 3324 else: 3325 self.assertEqual(new_prio, base + 1) 3326 finally: 3327 try: 3328 os.setpriority(os.PRIO_PROCESS, os.getpid(), base) 3329 except OSError as err: 3330 if err.errno != errno.EACCES: 3331 raise 3332 3333 3334@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()") 3335class TestSendfile(unittest.IsolatedAsyncioTestCase): 3336 3337 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 3338 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ 3339 not sys.platform.startswith("solaris") and \ 3340 not sys.platform.startswith("sunos") 3341 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS, 3342 'requires headers and trailers support') 3343 requires_32b = unittest.skipUnless(sys.maxsize < 2**32, 3344 'test is only meaningful on 32-bit builds') 3345 3346 @classmethod 3347 def setUpClass(cls): 3348 create_file(os_helper.TESTFN, cls.DATA) 3349 3350 @classmethod 3351 def tearDownClass(cls): 3352 os_helper.unlink(os_helper.TESTFN) 3353 3354 @staticmethod 3355 async def chunks(reader): 3356 while not reader.at_eof(): 3357 yield await reader.read() 3358 3359 async def handle_new_client(self, reader, writer): 3360 self.server_buffer = b''.join([x async for x in self.chunks(reader)]) 3361 writer.close() 3362 self.server.close() # The test server processes a single client only 3363 3364 async def asyncSetUp(self): 3365 self.server_buffer = b'' 3366 self.server = await asyncio.start_server(self.handle_new_client, 3367 socket_helper.HOSTv4) 3368 server_name = self.server.sockets[0].getsockname() 3369 self.client = socket.socket() 3370 self.client.setblocking(False) 3371 await asyncio.get_running_loop().sock_connect(self.client, server_name) 3372 self.sockno = self.client.fileno() 3373 self.file = open(os_helper.TESTFN, 'rb') 3374 self.fileno = self.file.fileno() 3375 3376 async def asyncTearDown(self): 3377 self.file.close() 3378 self.client.close() 3379 await self.server.wait_closed() 3380 3381 # Use the test subject instead of asyncio.loop.sendfile 3382 @staticmethod 3383 async def async_sendfile(*args, **kwargs): 3384 return await asyncio.to_thread(os.sendfile, *args, **kwargs) 3385 3386 @staticmethod 3387 async def sendfile_wrapper(*args, **kwargs): 3388 """A higher level wrapper representing how an application is 3389 supposed to use sendfile(). 3390 """ 3391 while True: 3392 try: 3393 return await TestSendfile.async_sendfile(*args, **kwargs) 3394 except OSError as err: 3395 if err.errno == errno.ECONNRESET: 3396 # disconnected 3397 raise 3398 elif err.errno in (errno.EAGAIN, errno.EBUSY): 3399 # we have to retry send data 3400 continue 3401 else: 3402 raise 3403 3404 async def test_send_whole_file(self): 3405 # normal send 3406 total_sent = 0 3407 offset = 0 3408 nbytes = 4096 3409 while total_sent < len(self.DATA): 3410 sent = await self.sendfile_wrapper(self.sockno, self.fileno, 3411 offset, nbytes) 3412 if sent == 0: 3413 break 3414 offset += sent 3415 total_sent += sent 3416 self.assertTrue(sent <= nbytes) 3417 self.assertEqual(offset, total_sent) 3418 3419 self.assertEqual(total_sent, len(self.DATA)) 3420 self.client.shutdown(socket.SHUT_RDWR) 3421 self.client.close() 3422 await self.server.wait_closed() 3423 self.assertEqual(len(self.server_buffer), len(self.DATA)) 3424 self.assertEqual(self.server_buffer, self.DATA) 3425 3426 async def test_send_at_certain_offset(self): 3427 # start sending a file at a certain offset 3428 total_sent = 0 3429 offset = len(self.DATA) // 2 3430 must_send = len(self.DATA) - offset 3431 nbytes = 4096 3432 while total_sent < must_send: 3433 sent = await self.sendfile_wrapper(self.sockno, self.fileno, 3434 offset, nbytes) 3435 if sent == 0: 3436 break 3437 offset += sent 3438 total_sent += sent 3439 self.assertTrue(sent <= nbytes) 3440 3441 self.client.shutdown(socket.SHUT_RDWR) 3442 self.client.close() 3443 await self.server.wait_closed() 3444 expected = self.DATA[len(self.DATA) // 2:] 3445 self.assertEqual(total_sent, len(expected)) 3446 self.assertEqual(len(self.server_buffer), len(expected)) 3447 self.assertEqual(self.server_buffer, expected) 3448 3449 async def test_offset_overflow(self): 3450 # specify an offset > file size 3451 offset = len(self.DATA) + 4096 3452 try: 3453 sent = await self.async_sendfile(self.sockno, self.fileno, 3454 offset, 4096) 3455 except OSError as e: 3456 # Solaris can raise EINVAL if offset >= file length, ignore. 3457 if e.errno != errno.EINVAL: 3458 raise 3459 else: 3460 self.assertEqual(sent, 0) 3461 self.client.shutdown(socket.SHUT_RDWR) 3462 self.client.close() 3463 await self.server.wait_closed() 3464 self.assertEqual(self.server_buffer, b'') 3465 3466 async def test_invalid_offset(self): 3467 with self.assertRaises(OSError) as cm: 3468 await self.async_sendfile(self.sockno, self.fileno, -1, 4096) 3469 self.assertEqual(cm.exception.errno, errno.EINVAL) 3470 3471 async def test_keywords(self): 3472 # Keyword arguments should be supported 3473 await self.async_sendfile(out_fd=self.sockno, in_fd=self.fileno, 3474 offset=0, count=4096) 3475 if self.SUPPORT_HEADERS_TRAILERS: 3476 await self.async_sendfile(out_fd=self.sockno, in_fd=self.fileno, 3477 offset=0, count=4096, 3478 headers=(), trailers=(), flags=0) 3479 3480 # --- headers / trailers tests 3481 3482 @requires_headers_trailers 3483 async def test_headers(self): 3484 total_sent = 0 3485 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1] 3486 sent = await self.async_sendfile(self.sockno, self.fileno, 0, 4096, 3487 headers=[b"x" * 512, b"y" * 256]) 3488 self.assertLessEqual(sent, 512 + 256 + 4096) 3489 total_sent += sent 3490 offset = 4096 3491 while total_sent < len(expected_data): 3492 nbytes = min(len(expected_data) - total_sent, 4096) 3493 sent = await self.sendfile_wrapper(self.sockno, self.fileno, 3494 offset, nbytes) 3495 if sent == 0: 3496 break 3497 self.assertLessEqual(sent, nbytes) 3498 total_sent += sent 3499 offset += sent 3500 3501 self.assertEqual(total_sent, len(expected_data)) 3502 self.client.close() 3503 await self.server.wait_closed() 3504 self.assertEqual(hash(self.server_buffer), hash(expected_data)) 3505 3506 @requires_headers_trailers 3507 async def test_trailers(self): 3508 TESTFN2 = os_helper.TESTFN + "2" 3509 file_data = b"abcdef" 3510 3511 self.addCleanup(os_helper.unlink, TESTFN2) 3512 create_file(TESTFN2, file_data) 3513 3514 with open(TESTFN2, 'rb') as f: 3515 await self.async_sendfile(self.sockno, f.fileno(), 0, 5, 3516 trailers=[b"123456", b"789"]) 3517 self.client.close() 3518 await self.server.wait_closed() 3519 self.assertEqual(self.server_buffer, b"abcde123456789") 3520 3521 @requires_headers_trailers 3522 @requires_32b 3523 async def test_headers_overflow_32bits(self): 3524 self.server.handler_instance.accumulate = False 3525 with self.assertRaises(OSError) as cm: 3526 await self.async_sendfile(self.sockno, self.fileno, 0, 0, 3527 headers=[b"x" * 2**16] * 2**15) 3528 self.assertEqual(cm.exception.errno, errno.EINVAL) 3529 3530 @requires_headers_trailers 3531 @requires_32b 3532 async def test_trailers_overflow_32bits(self): 3533 self.server.handler_instance.accumulate = False 3534 with self.assertRaises(OSError) as cm: 3535 await self.async_sendfile(self.sockno, self.fileno, 0, 0, 3536 trailers=[b"x" * 2**16] * 2**15) 3537 self.assertEqual(cm.exception.errno, errno.EINVAL) 3538 3539 @requires_headers_trailers 3540 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'), 3541 'test needs os.SF_NODISKIO') 3542 async def test_flags(self): 3543 try: 3544 await self.async_sendfile(self.sockno, self.fileno, 0, 4096, 3545 flags=os.SF_NODISKIO) 3546 except OSError as err: 3547 if err.errno not in (errno.EBUSY, errno.EAGAIN): 3548 raise 3549 3550 3551def supports_extended_attributes(): 3552 if not hasattr(os, "setxattr"): 3553 return False 3554 3555 try: 3556 with open(os_helper.TESTFN, "xb", 0) as fp: 3557 try: 3558 os.setxattr(fp.fileno(), b"user.test", b"") 3559 except OSError: 3560 return False 3561 finally: 3562 os_helper.unlink(os_helper.TESTFN) 3563 3564 return True 3565 3566 3567@unittest.skipUnless(supports_extended_attributes(), 3568 "no non-broken extended attribute support") 3569# Kernels < 2.6.39 don't respect setxattr flags. 3570@support.requires_linux_version(2, 6, 39) 3571class ExtendedAttributeTests(unittest.TestCase): 3572 3573 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): 3574 fn = os_helper.TESTFN 3575 self.addCleanup(os_helper.unlink, fn) 3576 create_file(fn) 3577 3578 with self.assertRaises(OSError) as cm: 3579 getxattr(fn, s("user.test"), **kwargs) 3580 self.assertEqual(cm.exception.errno, errno.ENODATA) 3581 3582 init_xattr = listxattr(fn) 3583 self.assertIsInstance(init_xattr, list) 3584 3585 setxattr(fn, s("user.test"), b"", **kwargs) 3586 xattr = set(init_xattr) 3587 xattr.add("user.test") 3588 self.assertEqual(set(listxattr(fn)), xattr) 3589 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") 3590 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) 3591 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") 3592 3593 with self.assertRaises(OSError) as cm: 3594 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) 3595 self.assertEqual(cm.exception.errno, errno.EEXIST) 3596 3597 with self.assertRaises(OSError) as cm: 3598 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) 3599 self.assertEqual(cm.exception.errno, errno.ENODATA) 3600 3601 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) 3602 xattr.add("user.test2") 3603 self.assertEqual(set(listxattr(fn)), xattr) 3604 removexattr(fn, s("user.test"), **kwargs) 3605 3606 with self.assertRaises(OSError) as cm: 3607 getxattr(fn, s("user.test"), **kwargs) 3608 self.assertEqual(cm.exception.errno, errno.ENODATA) 3609 3610 xattr.remove("user.test") 3611 self.assertEqual(set(listxattr(fn)), xattr) 3612 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") 3613 setxattr(fn, s("user.test"), b"a"*1024, **kwargs) 3614 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024) 3615 removexattr(fn, s("user.test"), **kwargs) 3616 many = sorted("user.test{}".format(i) for i in range(100)) 3617 for thing in many: 3618 setxattr(fn, thing, b"x", **kwargs) 3619 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) 3620 3621 def _check_xattrs(self, *args, **kwargs): 3622 self._check_xattrs_str(str, *args, **kwargs) 3623 os_helper.unlink(os_helper.TESTFN) 3624 3625 self._check_xattrs_str(os.fsencode, *args, **kwargs) 3626 os_helper.unlink(os_helper.TESTFN) 3627 3628 def test_simple(self): 3629 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 3630 os.listxattr) 3631 3632 def test_lpath(self): 3633 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 3634 os.listxattr, follow_symlinks=False) 3635 3636 def test_fds(self): 3637 def getxattr(path, *args): 3638 with open(path, "rb") as fp: 3639 return os.getxattr(fp.fileno(), *args) 3640 def setxattr(path, *args): 3641 with open(path, "wb", 0) as fp: 3642 os.setxattr(fp.fileno(), *args) 3643 def removexattr(path, *args): 3644 with open(path, "wb", 0) as fp: 3645 os.removexattr(fp.fileno(), *args) 3646 def listxattr(path, *args): 3647 with open(path, "rb") as fp: 3648 return os.listxattr(fp.fileno(), *args) 3649 self._check_xattrs(getxattr, setxattr, removexattr, listxattr) 3650 3651 3652@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size") 3653class TermsizeTests(unittest.TestCase): 3654 def test_does_not_crash(self): 3655 """Check if get_terminal_size() returns a meaningful value. 3656 3657 There's no easy portable way to actually check the size of the 3658 terminal, so let's check if it returns something sensible instead. 3659 """ 3660 try: 3661 size = os.get_terminal_size() 3662 except OSError as e: 3663 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 3664 # Under win32 a generic OSError can be thrown if the 3665 # handle cannot be retrieved 3666 self.skipTest("failed to query terminal size") 3667 raise 3668 3669 self.assertGreaterEqual(size.columns, 0) 3670 self.assertGreaterEqual(size.lines, 0) 3671 3672 def test_stty_match(self): 3673 """Check if stty returns the same results 3674 3675 stty actually tests stdin, so get_terminal_size is invoked on 3676 stdin explicitly. If stty succeeded, then get_terminal_size() 3677 should work too. 3678 """ 3679 try: 3680 size = ( 3681 subprocess.check_output( 3682 ["stty", "size"], stderr=subprocess.DEVNULL, text=True 3683 ).split() 3684 ) 3685 except (FileNotFoundError, subprocess.CalledProcessError, 3686 PermissionError): 3687 self.skipTest("stty invocation failed") 3688 expected = (int(size[1]), int(size[0])) # reversed order 3689 3690 try: 3691 actual = os.get_terminal_size(sys.__stdin__.fileno()) 3692 except OSError as e: 3693 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 3694 # Under win32 a generic OSError can be thrown if the 3695 # handle cannot be retrieved 3696 self.skipTest("failed to query terminal size") 3697 raise 3698 self.assertEqual(expected, actual) 3699 3700 3701@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create') 3702@support.requires_linux_version(3, 17) 3703class MemfdCreateTests(unittest.TestCase): 3704 def test_memfd_create(self): 3705 fd = os.memfd_create("Hi", os.MFD_CLOEXEC) 3706 self.assertNotEqual(fd, -1) 3707 self.addCleanup(os.close, fd) 3708 self.assertFalse(os.get_inheritable(fd)) 3709 with open(fd, "wb", closefd=False) as f: 3710 f.write(b'memfd_create') 3711 self.assertEqual(f.tell(), 12) 3712 3713 fd2 = os.memfd_create("Hi") 3714 self.addCleanup(os.close, fd2) 3715 self.assertFalse(os.get_inheritable(fd2)) 3716 3717 3718@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd') 3719@support.requires_linux_version(2, 6, 30) 3720class EventfdTests(unittest.TestCase): 3721 def test_eventfd_initval(self): 3722 def pack(value): 3723 """Pack as native uint64_t 3724 """ 3725 return struct.pack("@Q", value) 3726 size = 8 # read/write 8 bytes 3727 initval = 42 3728 fd = os.eventfd(initval) 3729 self.assertNotEqual(fd, -1) 3730 self.addCleanup(os.close, fd) 3731 self.assertFalse(os.get_inheritable(fd)) 3732 3733 # test with raw read/write 3734 res = os.read(fd, size) 3735 self.assertEqual(res, pack(initval)) 3736 3737 os.write(fd, pack(23)) 3738 res = os.read(fd, size) 3739 self.assertEqual(res, pack(23)) 3740 3741 os.write(fd, pack(40)) 3742 os.write(fd, pack(2)) 3743 res = os.read(fd, size) 3744 self.assertEqual(res, pack(42)) 3745 3746 # test with eventfd_read/eventfd_write 3747 os.eventfd_write(fd, 20) 3748 os.eventfd_write(fd, 3) 3749 res = os.eventfd_read(fd) 3750 self.assertEqual(res, 23) 3751 3752 def test_eventfd_semaphore(self): 3753 initval = 2 3754 flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK 3755 fd = os.eventfd(initval, flags) 3756 self.assertNotEqual(fd, -1) 3757 self.addCleanup(os.close, fd) 3758 3759 # semaphore starts has initval 2, two reads return '1' 3760 res = os.eventfd_read(fd) 3761 self.assertEqual(res, 1) 3762 res = os.eventfd_read(fd) 3763 self.assertEqual(res, 1) 3764 # third read would block 3765 with self.assertRaises(BlockingIOError): 3766 os.eventfd_read(fd) 3767 with self.assertRaises(BlockingIOError): 3768 os.read(fd, 8) 3769 3770 # increase semaphore counter, read one 3771 os.eventfd_write(fd, 1) 3772 res = os.eventfd_read(fd) 3773 self.assertEqual(res, 1) 3774 # next read would block, too 3775 with self.assertRaises(BlockingIOError): 3776 os.eventfd_read(fd) 3777 3778 def test_eventfd_select(self): 3779 flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK 3780 fd = os.eventfd(0, flags) 3781 self.assertNotEqual(fd, -1) 3782 self.addCleanup(os.close, fd) 3783 3784 # counter is zero, only writeable 3785 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) 3786 self.assertEqual((rfd, wfd, xfd), ([], [fd], [])) 3787 3788 # counter is non-zero, read and writeable 3789 os.eventfd_write(fd, 23) 3790 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) 3791 self.assertEqual((rfd, wfd, xfd), ([fd], [fd], [])) 3792 self.assertEqual(os.eventfd_read(fd), 23) 3793 3794 # counter at max, only readable 3795 os.eventfd_write(fd, (2**64) - 2) 3796 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) 3797 self.assertEqual((rfd, wfd, xfd), ([fd], [], [])) 3798 os.eventfd_read(fd) 3799 3800 3801class OSErrorTests(unittest.TestCase): 3802 def setUp(self): 3803 class Str(str): 3804 pass 3805 3806 self.bytes_filenames = [] 3807 self.unicode_filenames = [] 3808 if os_helper.TESTFN_UNENCODABLE is not None: 3809 decoded = os_helper.TESTFN_UNENCODABLE 3810 else: 3811 decoded = os_helper.TESTFN 3812 self.unicode_filenames.append(decoded) 3813 self.unicode_filenames.append(Str(decoded)) 3814 if os_helper.TESTFN_UNDECODABLE is not None: 3815 encoded = os_helper.TESTFN_UNDECODABLE 3816 else: 3817 encoded = os.fsencode(os_helper.TESTFN) 3818 self.bytes_filenames.append(encoded) 3819 self.bytes_filenames.append(bytearray(encoded)) 3820 self.bytes_filenames.append(memoryview(encoded)) 3821 3822 self.filenames = self.bytes_filenames + self.unicode_filenames 3823 3824 def test_oserror_filename(self): 3825 funcs = [ 3826 (self.filenames, os.chdir,), 3827 (self.filenames, os.lstat,), 3828 (self.filenames, os.open, os.O_RDONLY), 3829 (self.filenames, os.rmdir,), 3830 (self.filenames, os.stat,), 3831 (self.filenames, os.unlink,), 3832 ] 3833 if sys.platform == "win32": 3834 funcs.extend(( 3835 (self.bytes_filenames, os.rename, b"dst"), 3836 (self.bytes_filenames, os.replace, b"dst"), 3837 (self.unicode_filenames, os.rename, "dst"), 3838 (self.unicode_filenames, os.replace, "dst"), 3839 (self.unicode_filenames, os.listdir, ), 3840 )) 3841 else: 3842 funcs.extend(( 3843 (self.filenames, os.listdir,), 3844 (self.filenames, os.rename, "dst"), 3845 (self.filenames, os.replace, "dst"), 3846 )) 3847 if os_helper.can_chmod(): 3848 funcs.append((self.filenames, os.chmod, 0o777)) 3849 if hasattr(os, "chown"): 3850 funcs.append((self.filenames, os.chown, 0, 0)) 3851 if hasattr(os, "lchown"): 3852 funcs.append((self.filenames, os.lchown, 0, 0)) 3853 if hasattr(os, "truncate"): 3854 funcs.append((self.filenames, os.truncate, 0)) 3855 if hasattr(os, "chflags"): 3856 funcs.append((self.filenames, os.chflags, 0)) 3857 if hasattr(os, "lchflags"): 3858 funcs.append((self.filenames, os.lchflags, 0)) 3859 if hasattr(os, "chroot"): 3860 funcs.append((self.filenames, os.chroot,)) 3861 if hasattr(os, "link"): 3862 if sys.platform == "win32": 3863 funcs.append((self.bytes_filenames, os.link, b"dst")) 3864 funcs.append((self.unicode_filenames, os.link, "dst")) 3865 else: 3866 funcs.append((self.filenames, os.link, "dst")) 3867 if hasattr(os, "listxattr"): 3868 funcs.extend(( 3869 (self.filenames, os.listxattr,), 3870 (self.filenames, os.getxattr, "user.test"), 3871 (self.filenames, os.setxattr, "user.test", b'user'), 3872 (self.filenames, os.removexattr, "user.test"), 3873 )) 3874 if hasattr(os, "lchmod"): 3875 funcs.append((self.filenames, os.lchmod, 0o777)) 3876 if hasattr(os, "readlink"): 3877 funcs.append((self.filenames, os.readlink,)) 3878 3879 3880 for filenames, func, *func_args in funcs: 3881 for name in filenames: 3882 try: 3883 if isinstance(name, (str, bytes)): 3884 func(name, *func_args) 3885 else: 3886 with self.assertWarnsRegex(DeprecationWarning, 'should be'): 3887 func(name, *func_args) 3888 except OSError as err: 3889 self.assertIs(err.filename, name, str(func)) 3890 except UnicodeDecodeError: 3891 pass 3892 else: 3893 self.fail("No exception thrown by {}".format(func)) 3894 3895class CPUCountTests(unittest.TestCase): 3896 def test_cpu_count(self): 3897 cpus = os.cpu_count() 3898 if cpus is not None: 3899 self.assertIsInstance(cpus, int) 3900 self.assertGreater(cpus, 0) 3901 else: 3902 self.skipTest("Could not determine the number of CPUs") 3903 3904 3905# FD inheritance check is only useful for systems with process support. 3906@support.requires_subprocess() 3907class FDInheritanceTests(unittest.TestCase): 3908 def test_get_set_inheritable(self): 3909 fd = os.open(__file__, os.O_RDONLY) 3910 self.addCleanup(os.close, fd) 3911 self.assertEqual(os.get_inheritable(fd), False) 3912 3913 os.set_inheritable(fd, True) 3914 self.assertEqual(os.get_inheritable(fd), True) 3915 3916 @unittest.skipIf(fcntl is None, "need fcntl") 3917 def test_get_inheritable_cloexec(self): 3918 fd = os.open(__file__, os.O_RDONLY) 3919 self.addCleanup(os.close, fd) 3920 self.assertEqual(os.get_inheritable(fd), False) 3921 3922 # clear FD_CLOEXEC flag 3923 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 3924 flags &= ~fcntl.FD_CLOEXEC 3925 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 3926 3927 self.assertEqual(os.get_inheritable(fd), True) 3928 3929 @unittest.skipIf(fcntl is None, "need fcntl") 3930 def test_set_inheritable_cloexec(self): 3931 fd = os.open(__file__, os.O_RDONLY) 3932 self.addCleanup(os.close, fd) 3933 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 3934 fcntl.FD_CLOEXEC) 3935 3936 os.set_inheritable(fd, True) 3937 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 3938 0) 3939 3940 @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH") 3941 def test_get_set_inheritable_o_path(self): 3942 fd = os.open(__file__, os.O_PATH) 3943 self.addCleanup(os.close, fd) 3944 self.assertEqual(os.get_inheritable(fd), False) 3945 3946 os.set_inheritable(fd, True) 3947 self.assertEqual(os.get_inheritable(fd), True) 3948 3949 os.set_inheritable(fd, False) 3950 self.assertEqual(os.get_inheritable(fd), False) 3951 3952 def test_get_set_inheritable_badf(self): 3953 fd = os_helper.make_bad_fd() 3954 3955 with self.assertRaises(OSError) as ctx: 3956 os.get_inheritable(fd) 3957 self.assertEqual(ctx.exception.errno, errno.EBADF) 3958 3959 with self.assertRaises(OSError) as ctx: 3960 os.set_inheritable(fd, True) 3961 self.assertEqual(ctx.exception.errno, errno.EBADF) 3962 3963 with self.assertRaises(OSError) as ctx: 3964 os.set_inheritable(fd, False) 3965 self.assertEqual(ctx.exception.errno, errno.EBADF) 3966 3967 def test_open(self): 3968 fd = os.open(__file__, os.O_RDONLY) 3969 self.addCleanup(os.close, fd) 3970 self.assertEqual(os.get_inheritable(fd), False) 3971 3972 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()") 3973 def test_pipe(self): 3974 rfd, wfd = os.pipe() 3975 self.addCleanup(os.close, rfd) 3976 self.addCleanup(os.close, wfd) 3977 self.assertEqual(os.get_inheritable(rfd), False) 3978 self.assertEqual(os.get_inheritable(wfd), False) 3979 3980 def test_dup(self): 3981 fd1 = os.open(__file__, os.O_RDONLY) 3982 self.addCleanup(os.close, fd1) 3983 3984 fd2 = os.dup(fd1) 3985 self.addCleanup(os.close, fd2) 3986 self.assertEqual(os.get_inheritable(fd2), False) 3987 3988 def test_dup_standard_stream(self): 3989 fd = os.dup(1) 3990 self.addCleanup(os.close, fd) 3991 self.assertGreater(fd, 0) 3992 3993 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 3994 def test_dup_nul(self): 3995 # os.dup() was creating inheritable fds for character files. 3996 fd1 = os.open('NUL', os.O_RDONLY) 3997 self.addCleanup(os.close, fd1) 3998 fd2 = os.dup(fd1) 3999 self.addCleanup(os.close, fd2) 4000 self.assertFalse(os.get_inheritable(fd2)) 4001 4002 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()") 4003 def test_dup2(self): 4004 fd = os.open(__file__, os.O_RDONLY) 4005 self.addCleanup(os.close, fd) 4006 4007 # inheritable by default 4008 fd2 = os.open(__file__, os.O_RDONLY) 4009 self.addCleanup(os.close, fd2) 4010 self.assertEqual(os.dup2(fd, fd2), fd2) 4011 self.assertTrue(os.get_inheritable(fd2)) 4012 4013 # force non-inheritable 4014 fd3 = os.open(__file__, os.O_RDONLY) 4015 self.addCleanup(os.close, fd3) 4016 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3) 4017 self.assertFalse(os.get_inheritable(fd3)) 4018 4019 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") 4020 def test_openpty(self): 4021 master_fd, slave_fd = os.openpty() 4022 self.addCleanup(os.close, master_fd) 4023 self.addCleanup(os.close, slave_fd) 4024 self.assertEqual(os.get_inheritable(master_fd), False) 4025 self.assertEqual(os.get_inheritable(slave_fd), False) 4026 4027 4028class PathTConverterTests(unittest.TestCase): 4029 # tuples of (function name, allows fd arguments, additional arguments to 4030 # function, cleanup function) 4031 functions = [ 4032 ('stat', True, (), None), 4033 ('lstat', False, (), None), 4034 ('access', False, (os.F_OK,), None), 4035 ('chflags', False, (0,), None), 4036 ('lchflags', False, (0,), None), 4037 ('open', False, (os.O_RDONLY,), getattr(os, 'close', None)), 4038 ] 4039 4040 def test_path_t_converter(self): 4041 str_filename = os_helper.TESTFN 4042 if os.name == 'nt': 4043 bytes_fspath = bytes_filename = None 4044 else: 4045 bytes_filename = os.fsencode(os_helper.TESTFN) 4046 bytes_fspath = FakePath(bytes_filename) 4047 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT) 4048 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 4049 self.addCleanup(os.close, fd) 4050 4051 int_fspath = FakePath(fd) 4052 str_fspath = FakePath(str_filename) 4053 4054 for name, allow_fd, extra_args, cleanup_fn in self.functions: 4055 with self.subTest(name=name): 4056 try: 4057 fn = getattr(os, name) 4058 except AttributeError: 4059 continue 4060 4061 for path in (str_filename, bytes_filename, str_fspath, 4062 bytes_fspath): 4063 if path is None: 4064 continue 4065 with self.subTest(name=name, path=path): 4066 result = fn(path, *extra_args) 4067 if cleanup_fn is not None: 4068 cleanup_fn(result) 4069 4070 with self.assertRaisesRegex( 4071 TypeError, 'to return str or bytes'): 4072 fn(int_fspath, *extra_args) 4073 4074 if allow_fd: 4075 result = fn(fd, *extra_args) # should not fail 4076 if cleanup_fn is not None: 4077 cleanup_fn(result) 4078 else: 4079 with self.assertRaisesRegex( 4080 TypeError, 4081 'os.PathLike'): 4082 fn(fd, *extra_args) 4083 4084 def test_path_t_converter_and_custom_class(self): 4085 msg = r'__fspath__\(\) to return str or bytes, not %s' 4086 with self.assertRaisesRegex(TypeError, msg % r'int'): 4087 os.stat(FakePath(2)) 4088 with self.assertRaisesRegex(TypeError, msg % r'float'): 4089 os.stat(FakePath(2.34)) 4090 with self.assertRaisesRegex(TypeError, msg % r'object'): 4091 os.stat(FakePath(object())) 4092 4093 4094@unittest.skipUnless(hasattr(os, 'get_blocking'), 4095 'needs os.get_blocking() and os.set_blocking()') 4096@unittest.skipIf(support.is_emscripten, "Cannot unset blocking flag") 4097class BlockingTests(unittest.TestCase): 4098 def test_blocking(self): 4099 fd = os.open(__file__, os.O_RDONLY) 4100 self.addCleanup(os.close, fd) 4101 self.assertEqual(os.get_blocking(fd), True) 4102 4103 os.set_blocking(fd, False) 4104 self.assertEqual(os.get_blocking(fd), False) 4105 4106 os.set_blocking(fd, True) 4107 self.assertEqual(os.get_blocking(fd), True) 4108 4109 4110 4111class ExportsTests(unittest.TestCase): 4112 def test_os_all(self): 4113 self.assertIn('open', os.__all__) 4114 self.assertIn('walk', os.__all__) 4115 4116 4117class TestDirEntry(unittest.TestCase): 4118 def setUp(self): 4119 self.path = os.path.realpath(os_helper.TESTFN) 4120 self.addCleanup(os_helper.rmtree, self.path) 4121 os.mkdir(self.path) 4122 4123 def test_uninstantiable(self): 4124 self.assertRaises(TypeError, os.DirEntry) 4125 4126 def test_unpickable(self): 4127 filename = create_file(os.path.join(self.path, "file.txt"), b'python') 4128 entry = [entry for entry in os.scandir(self.path)].pop() 4129 self.assertIsInstance(entry, os.DirEntry) 4130 self.assertEqual(entry.name, "file.txt") 4131 import pickle 4132 self.assertRaises(TypeError, pickle.dumps, entry, filename) 4133 4134 4135class TestScandir(unittest.TestCase): 4136 check_no_resource_warning = warnings_helper.check_no_resource_warning 4137 4138 def setUp(self): 4139 self.path = os.path.realpath(os_helper.TESTFN) 4140 self.bytes_path = os.fsencode(self.path) 4141 self.addCleanup(os_helper.rmtree, self.path) 4142 os.mkdir(self.path) 4143 4144 def create_file(self, name="file.txt"): 4145 path = self.bytes_path if isinstance(name, bytes) else self.path 4146 filename = os.path.join(path, name) 4147 create_file(filename, b'python') 4148 return filename 4149 4150 def get_entries(self, names): 4151 entries = dict((entry.name, entry) 4152 for entry in os.scandir(self.path)) 4153 self.assertEqual(sorted(entries.keys()), names) 4154 return entries 4155 4156 def assert_stat_equal(self, stat1, stat2, skip_fields): 4157 if skip_fields: 4158 for attr in dir(stat1): 4159 if not attr.startswith("st_"): 4160 continue 4161 if attr in ("st_dev", "st_ino", "st_nlink"): 4162 continue 4163 self.assertEqual(getattr(stat1, attr), 4164 getattr(stat2, attr), 4165 (stat1, stat2, attr)) 4166 else: 4167 self.assertEqual(stat1, stat2) 4168 4169 def test_uninstantiable(self): 4170 scandir_iter = os.scandir(self.path) 4171 self.assertRaises(TypeError, type(scandir_iter)) 4172 scandir_iter.close() 4173 4174 def test_unpickable(self): 4175 filename = self.create_file("file.txt") 4176 scandir_iter = os.scandir(self.path) 4177 import pickle 4178 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename) 4179 scandir_iter.close() 4180 4181 def check_entry(self, entry, name, is_dir, is_file, is_symlink): 4182 self.assertIsInstance(entry, os.DirEntry) 4183 self.assertEqual(entry.name, name) 4184 self.assertEqual(entry.path, os.path.join(self.path, name)) 4185 self.assertEqual(entry.inode(), 4186 os.stat(entry.path, follow_symlinks=False).st_ino) 4187 4188 entry_stat = os.stat(entry.path) 4189 self.assertEqual(entry.is_dir(), 4190 stat.S_ISDIR(entry_stat.st_mode)) 4191 self.assertEqual(entry.is_file(), 4192 stat.S_ISREG(entry_stat.st_mode)) 4193 self.assertEqual(entry.is_symlink(), 4194 os.path.islink(entry.path)) 4195 4196 entry_lstat = os.stat(entry.path, follow_symlinks=False) 4197 self.assertEqual(entry.is_dir(follow_symlinks=False), 4198 stat.S_ISDIR(entry_lstat.st_mode)) 4199 self.assertEqual(entry.is_file(follow_symlinks=False), 4200 stat.S_ISREG(entry_lstat.st_mode)) 4201 4202 self.assert_stat_equal(entry.stat(), 4203 entry_stat, 4204 os.name == 'nt' and not is_symlink) 4205 self.assert_stat_equal(entry.stat(follow_symlinks=False), 4206 entry_lstat, 4207 os.name == 'nt') 4208 4209 def test_attributes(self): 4210 link = hasattr(os, 'link') 4211 symlink = os_helper.can_symlink() 4212 4213 dirname = os.path.join(self.path, "dir") 4214 os.mkdir(dirname) 4215 filename = self.create_file("file.txt") 4216 if link: 4217 try: 4218 os.link(filename, os.path.join(self.path, "link_file.txt")) 4219 except PermissionError as e: 4220 self.skipTest('os.link(): %s' % e) 4221 if symlink: 4222 os.symlink(dirname, os.path.join(self.path, "symlink_dir"), 4223 target_is_directory=True) 4224 os.symlink(filename, os.path.join(self.path, "symlink_file.txt")) 4225 4226 names = ['dir', 'file.txt'] 4227 if link: 4228 names.append('link_file.txt') 4229 if symlink: 4230 names.extend(('symlink_dir', 'symlink_file.txt')) 4231 entries = self.get_entries(names) 4232 4233 entry = entries['dir'] 4234 self.check_entry(entry, 'dir', True, False, False) 4235 4236 entry = entries['file.txt'] 4237 self.check_entry(entry, 'file.txt', False, True, False) 4238 4239 if link: 4240 entry = entries['link_file.txt'] 4241 self.check_entry(entry, 'link_file.txt', False, True, False) 4242 4243 if symlink: 4244 entry = entries['symlink_dir'] 4245 self.check_entry(entry, 'symlink_dir', True, False, True) 4246 4247 entry = entries['symlink_file.txt'] 4248 self.check_entry(entry, 'symlink_file.txt', False, True, True) 4249 4250 def get_entry(self, name): 4251 path = self.bytes_path if isinstance(name, bytes) else self.path 4252 entries = list(os.scandir(path)) 4253 self.assertEqual(len(entries), 1) 4254 4255 entry = entries[0] 4256 self.assertEqual(entry.name, name) 4257 return entry 4258 4259 def create_file_entry(self, name='file.txt'): 4260 filename = self.create_file(name=name) 4261 return self.get_entry(os.path.basename(filename)) 4262 4263 def test_current_directory(self): 4264 filename = self.create_file() 4265 old_dir = os.getcwd() 4266 try: 4267 os.chdir(self.path) 4268 4269 # call scandir() without parameter: it must list the content 4270 # of the current directory 4271 entries = dict((entry.name, entry) for entry in os.scandir()) 4272 self.assertEqual(sorted(entries.keys()), 4273 [os.path.basename(filename)]) 4274 finally: 4275 os.chdir(old_dir) 4276 4277 def test_repr(self): 4278 entry = self.create_file_entry() 4279 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>") 4280 4281 def test_fspath_protocol(self): 4282 entry = self.create_file_entry() 4283 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt')) 4284 4285 def test_fspath_protocol_bytes(self): 4286 bytes_filename = os.fsencode('bytesfile.txt') 4287 bytes_entry = self.create_file_entry(name=bytes_filename) 4288 fspath = os.fspath(bytes_entry) 4289 self.assertIsInstance(fspath, bytes) 4290 self.assertEqual(fspath, 4291 os.path.join(os.fsencode(self.path),bytes_filename)) 4292 4293 def test_removed_dir(self): 4294 path = os.path.join(self.path, 'dir') 4295 4296 os.mkdir(path) 4297 entry = self.get_entry('dir') 4298 os.rmdir(path) 4299 4300 # On POSIX, is_dir() result depends if scandir() filled d_type or not 4301 if os.name == 'nt': 4302 self.assertTrue(entry.is_dir()) 4303 self.assertFalse(entry.is_file()) 4304 self.assertFalse(entry.is_symlink()) 4305 if os.name == 'nt': 4306 self.assertRaises(FileNotFoundError, entry.inode) 4307 # don't fail 4308 entry.stat() 4309 entry.stat(follow_symlinks=False) 4310 else: 4311 self.assertGreater(entry.inode(), 0) 4312 self.assertRaises(FileNotFoundError, entry.stat) 4313 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 4314 4315 def test_removed_file(self): 4316 entry = self.create_file_entry() 4317 os.unlink(entry.path) 4318 4319 self.assertFalse(entry.is_dir()) 4320 # On POSIX, is_dir() result depends if scandir() filled d_type or not 4321 if os.name == 'nt': 4322 self.assertTrue(entry.is_file()) 4323 self.assertFalse(entry.is_symlink()) 4324 if os.name == 'nt': 4325 self.assertRaises(FileNotFoundError, entry.inode) 4326 # don't fail 4327 entry.stat() 4328 entry.stat(follow_symlinks=False) 4329 else: 4330 self.assertGreater(entry.inode(), 0) 4331 self.assertRaises(FileNotFoundError, entry.stat) 4332 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 4333 4334 def test_broken_symlink(self): 4335 if not os_helper.can_symlink(): 4336 return self.skipTest('cannot create symbolic link') 4337 4338 filename = self.create_file("file.txt") 4339 os.symlink(filename, 4340 os.path.join(self.path, "symlink.txt")) 4341 entries = self.get_entries(['file.txt', 'symlink.txt']) 4342 entry = entries['symlink.txt'] 4343 os.unlink(filename) 4344 4345 self.assertGreater(entry.inode(), 0) 4346 self.assertFalse(entry.is_dir()) 4347 self.assertFalse(entry.is_file()) # broken symlink returns False 4348 self.assertFalse(entry.is_dir(follow_symlinks=False)) 4349 self.assertFalse(entry.is_file(follow_symlinks=False)) 4350 self.assertTrue(entry.is_symlink()) 4351 self.assertRaises(FileNotFoundError, entry.stat) 4352 # don't fail 4353 entry.stat(follow_symlinks=False) 4354 4355 def test_bytes(self): 4356 self.create_file("file.txt") 4357 4358 path_bytes = os.fsencode(self.path) 4359 entries = list(os.scandir(path_bytes)) 4360 self.assertEqual(len(entries), 1, entries) 4361 entry = entries[0] 4362 4363 self.assertEqual(entry.name, b'file.txt') 4364 self.assertEqual(entry.path, 4365 os.fsencode(os.path.join(self.path, 'file.txt'))) 4366 4367 def test_bytes_like(self): 4368 self.create_file("file.txt") 4369 4370 for cls in bytearray, memoryview: 4371 path_bytes = cls(os.fsencode(self.path)) 4372 with self.assertWarns(DeprecationWarning): 4373 entries = list(os.scandir(path_bytes)) 4374 self.assertEqual(len(entries), 1, entries) 4375 entry = entries[0] 4376 4377 self.assertEqual(entry.name, b'file.txt') 4378 self.assertEqual(entry.path, 4379 os.fsencode(os.path.join(self.path, 'file.txt'))) 4380 self.assertIs(type(entry.name), bytes) 4381 self.assertIs(type(entry.path), bytes) 4382 4383 @unittest.skipUnless(os.listdir in os.supports_fd, 4384 'fd support for listdir required for this test.') 4385 def test_fd(self): 4386 self.assertIn(os.scandir, os.supports_fd) 4387 self.create_file('file.txt') 4388 expected_names = ['file.txt'] 4389 if os_helper.can_symlink(): 4390 os.symlink('file.txt', os.path.join(self.path, 'link')) 4391 expected_names.append('link') 4392 4393 with os_helper.open_dir_fd(self.path) as fd: 4394 with os.scandir(fd) as it: 4395 entries = list(it) 4396 names = [entry.name for entry in entries] 4397 self.assertEqual(sorted(names), expected_names) 4398 self.assertEqual(names, os.listdir(fd)) 4399 for entry in entries: 4400 self.assertEqual(entry.path, entry.name) 4401 self.assertEqual(os.fspath(entry), entry.name) 4402 self.assertEqual(entry.is_symlink(), entry.name == 'link') 4403 if os.stat in os.supports_dir_fd: 4404 st = os.stat(entry.name, dir_fd=fd) 4405 self.assertEqual(entry.stat(), st) 4406 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False) 4407 self.assertEqual(entry.stat(follow_symlinks=False), st) 4408 4409 @unittest.skipIf(support.is_wasi, "WASI maps '' to cwd") 4410 def test_empty_path(self): 4411 self.assertRaises(FileNotFoundError, os.scandir, '') 4412 4413 def test_consume_iterator_twice(self): 4414 self.create_file("file.txt") 4415 iterator = os.scandir(self.path) 4416 4417 entries = list(iterator) 4418 self.assertEqual(len(entries), 1, entries) 4419 4420 # check than consuming the iterator twice doesn't raise exception 4421 entries2 = list(iterator) 4422 self.assertEqual(len(entries2), 0, entries2) 4423 4424 def test_bad_path_type(self): 4425 for obj in [1.234, {}, []]: 4426 self.assertRaises(TypeError, os.scandir, obj) 4427 4428 def test_close(self): 4429 self.create_file("file.txt") 4430 self.create_file("file2.txt") 4431 iterator = os.scandir(self.path) 4432 next(iterator) 4433 iterator.close() 4434 # multiple closes 4435 iterator.close() 4436 with self.check_no_resource_warning(): 4437 del iterator 4438 4439 def test_context_manager(self): 4440 self.create_file("file.txt") 4441 self.create_file("file2.txt") 4442 with os.scandir(self.path) as iterator: 4443 next(iterator) 4444 with self.check_no_resource_warning(): 4445 del iterator 4446 4447 def test_context_manager_close(self): 4448 self.create_file("file.txt") 4449 self.create_file("file2.txt") 4450 with os.scandir(self.path) as iterator: 4451 next(iterator) 4452 iterator.close() 4453 4454 def test_context_manager_exception(self): 4455 self.create_file("file.txt") 4456 self.create_file("file2.txt") 4457 with self.assertRaises(ZeroDivisionError): 4458 with os.scandir(self.path) as iterator: 4459 next(iterator) 4460 1/0 4461 with self.check_no_resource_warning(): 4462 del iterator 4463 4464 def test_resource_warning(self): 4465 self.create_file("file.txt") 4466 self.create_file("file2.txt") 4467 iterator = os.scandir(self.path) 4468 next(iterator) 4469 with self.assertWarns(ResourceWarning): 4470 del iterator 4471 support.gc_collect() 4472 # exhausted iterator 4473 iterator = os.scandir(self.path) 4474 list(iterator) 4475 with self.check_no_resource_warning(): 4476 del iterator 4477 4478 4479class TestPEP519(unittest.TestCase): 4480 4481 # Abstracted so it can be overridden to test pure Python implementation 4482 # if a C version is provided. 4483 fspath = staticmethod(os.fspath) 4484 4485 def test_return_bytes(self): 4486 for b in b'hello', b'goodbye', b'some/path/and/file': 4487 self.assertEqual(b, self.fspath(b)) 4488 4489 def test_return_string(self): 4490 for s in 'hello', 'goodbye', 'some/path/and/file': 4491 self.assertEqual(s, self.fspath(s)) 4492 4493 def test_fsencode_fsdecode(self): 4494 for p in "path/like/object", b"path/like/object": 4495 pathlike = FakePath(p) 4496 4497 self.assertEqual(p, self.fspath(pathlike)) 4498 self.assertEqual(b"path/like/object", os.fsencode(pathlike)) 4499 self.assertEqual("path/like/object", os.fsdecode(pathlike)) 4500 4501 def test_pathlike(self): 4502 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil'))) 4503 self.assertTrue(issubclass(FakePath, os.PathLike)) 4504 self.assertTrue(isinstance(FakePath('x'), os.PathLike)) 4505 4506 def test_garbage_in_exception_out(self): 4507 vapor = type('blah', (), {}) 4508 for o in int, type, os, vapor(): 4509 self.assertRaises(TypeError, self.fspath, o) 4510 4511 def test_argument_required(self): 4512 self.assertRaises(TypeError, self.fspath) 4513 4514 def test_bad_pathlike(self): 4515 # __fspath__ returns a value other than str or bytes. 4516 self.assertRaises(TypeError, self.fspath, FakePath(42)) 4517 # __fspath__ attribute that is not callable. 4518 c = type('foo', (), {}) 4519 c.__fspath__ = 1 4520 self.assertRaises(TypeError, self.fspath, c()) 4521 # __fspath__ raises an exception. 4522 self.assertRaises(ZeroDivisionError, self.fspath, 4523 FakePath(ZeroDivisionError())) 4524 4525 def test_pathlike_subclasshook(self): 4526 # bpo-38878: subclasshook causes subclass checks 4527 # true on abstract implementation. 4528 class A(os.PathLike): 4529 pass 4530 self.assertFalse(issubclass(FakePath, A)) 4531 self.assertTrue(issubclass(FakePath, os.PathLike)) 4532 4533 def test_pathlike_class_getitem(self): 4534 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias) 4535 4536 4537class TimesTests(unittest.TestCase): 4538 def test_times(self): 4539 times = os.times() 4540 self.assertIsInstance(times, os.times_result) 4541 4542 for field in ('user', 'system', 'children_user', 'children_system', 4543 'elapsed'): 4544 value = getattr(times, field) 4545 self.assertIsInstance(value, float) 4546 4547 if os.name == 'nt': 4548 self.assertEqual(times.children_user, 0) 4549 self.assertEqual(times.children_system, 0) 4550 self.assertEqual(times.elapsed, 0) 4551 4552 4553@support.requires_fork() 4554class ForkTests(unittest.TestCase): 4555 def test_fork(self): 4556 # bpo-42540: ensure os.fork() with non-default memory allocator does 4557 # not crash on exit. 4558 code = """if 1: 4559 import os 4560 from test import support 4561 pid = os.fork() 4562 if pid != 0: 4563 support.wait_process(pid, exitcode=0) 4564 """ 4565 assert_python_ok("-c", code) 4566 assert_python_ok("-c", code, PYTHONMALLOC="malloc_debug") 4567 4568 4569# Only test if the C version is provided, otherwise TestPEP519 already tested 4570# the pure Python implementation. 4571if hasattr(os, "_fspath"): 4572 class TestPEP519PurePython(TestPEP519): 4573 4574 """Explicitly test the pure Python implementation of os.fspath().""" 4575 4576 fspath = staticmethod(os._fspath) 4577 4578 4579if __name__ == "__main__": 4580 unittest.main() 4581