1# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
3# portable than they had been thought to be.
4
5import asyncio
6import codecs
7import contextlib
8import decimal
9import errno
10import fnmatch
11import fractions
12import itertools
13import locale
14import os
15import pickle
16import select
17import shutil
18import signal
19import socket
20import stat
21import struct
22import subprocess
23import sys
24import sysconfig
25import tempfile
26import textwrap
27import time
28import types
29import unittest
30import uuid
31import warnings
32from test import support
33from test.support import import_helper
34from test.support import os_helper
35from test.support import socket_helper
36from test.support import warnings_helper
37from platform import win32_is_iot
38
39try:
40    import resource
41except ImportError:
42    resource = None
43try:
44    import fcntl
45except ImportError:
46    fcntl = None
47try:
48    import _winapi
49except ImportError:
50    _winapi = None
51try:
52    import pwd
53    all_users = [u.pw_uid for u in pwd.getpwall()]
54except (ImportError, AttributeError):
55    all_users = []
56try:
57    from _testcapi import INT_MAX, PY_SSIZE_T_MAX
58except ImportError:
59    INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
60
61try:
62    import mmap
63except ImportError:
64    mmap = None
65
66from test.support.script_helper import assert_python_ok
67from test.support import unix_shell
68from test.support.os_helper import FakePath
69
70
71root_in_posix = False
72if hasattr(os, 'geteuid'):
73    root_in_posix = (os.geteuid() == 0)
74
75# Detect whether we're on a Linux system that uses the (now outdated
76# and unmaintained) linuxthreads threading library.  There's an issue
77# when combining linuxthreads with a failed execv call: see
78# http://bugs.python.org/issue4970.
79if hasattr(sys, 'thread_info') and sys.thread_info.version:
80    USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
81else:
82    USING_LINUXTHREADS = False
83
84# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
85HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
86
87
88def requires_os_func(name):
89    return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
90
91
92def create_file(filename, content=b'content'):
93    with open(filename, "xb", 0) as fp:
94        fp.write(content)
95
96
97# bpo-41625: On AIX, splice() only works with a socket, not with a pipe.
98requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"),
99                                       'on AIX, splice() only accepts sockets')
100
101
102def tearDownModule():
103    asyncio.set_event_loop_policy(None)
104
105
106class MiscTests(unittest.TestCase):
107    def test_getcwd(self):
108        cwd = os.getcwd()
109        self.assertIsInstance(cwd, str)
110
111    def test_getcwd_long_path(self):
112        # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
113        # Windows, MAX_PATH is defined as 260 characters, but Windows supports
114        # longer path if longer paths support is enabled. Internally, the os
115        # module uses MAXPATHLEN which is at least 1024.
116        #
117        # Use a directory name of 200 characters to fit into Windows MAX_PATH
118        # limit.
119        #
120        # On Windows, the test can stop when trying to create a path longer
121        # than MAX_PATH if long paths support is disabled:
122        # see RtlAreLongPathsEnabled().
123        min_len = 2000   # characters
124        # On VxWorks, PATH_MAX is defined as 1024 bytes. Creating a path
125        # longer than PATH_MAX will fail.
126        if sys.platform == 'vxworks':
127            min_len = 1000
128        dirlen = 200     # characters
129        dirname = 'python_test_dir_'
130        dirname = dirname + ('a' * (dirlen - len(dirname)))
131
132        with tempfile.TemporaryDirectory() as tmpdir:
133            with os_helper.change_cwd(tmpdir) as path:
134                expected = path
135
136                while True:
137                    cwd = os.getcwd()
138                    self.assertEqual(cwd, expected)
139
140                    need = min_len - (len(cwd) + len(os.path.sep))
141                    if need <= 0:
142                        break
143                    if len(dirname) > need and need > 0:
144                        dirname = dirname[:need]
145
146                    path = os.path.join(path, dirname)
147                    try:
148                        os.mkdir(path)
149                        # On Windows, chdir() can fail
150                        # even if mkdir() succeeded
151                        os.chdir(path)
152                    except FileNotFoundError:
153                        # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
154                        # ERROR_FILENAME_EXCED_RANGE (206) errors
155                        # ("The filename or extension is too long")
156                        break
157                    except OSError as exc:
158                        if exc.errno == errno.ENAMETOOLONG:
159                            break
160                        else:
161                            raise
162
163                    expected = path
164
165                if support.verbose:
166                    print(f"Tested current directory length: {len(cwd)}")
167
168    def test_getcwdb(self):
169        cwd = os.getcwdb()
170        self.assertIsInstance(cwd, bytes)
171        self.assertEqual(os.fsdecode(cwd), os.getcwd())
172
173
174# Tests creating TESTFN
175class FileTests(unittest.TestCase):
176    def setUp(self):
177        if os.path.lexists(os_helper.TESTFN):
178            os.unlink(os_helper.TESTFN)
179    tearDown = setUp
180
181    def test_access(self):
182        f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
183        os.close(f)
184        self.assertTrue(os.access(os_helper.TESTFN, os.W_OK))
185
186    @unittest.skipIf(
187        support.is_emscripten, "Test is unstable under Emscripten."
188    )
189    @unittest.skipIf(
190        support.is_wasi, "WASI does not support dup."
191    )
192    def test_closerange(self):
193        first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
194        # We must allocate two consecutive file descriptors, otherwise
195        # it will mess up other file descriptors (perhaps even the three
196        # standard ones).
197        second = os.dup(first)
198        try:
199            retries = 0
200            while second != first + 1:
201                os.close(first)
202                retries += 1
203                if retries > 10:
204                    # XXX test skipped
205                    self.skipTest("couldn't allocate two consecutive fds")
206                first, second = second, os.dup(second)
207        finally:
208            os.close(second)
209        # close a fd that is open, and one that isn't
210        os.closerange(first, first + 2)
211        self.assertRaises(OSError, os.write, first, b"a")
212
213    @support.cpython_only
214    def test_rename(self):
215        path = os_helper.TESTFN
216        old = sys.getrefcount(path)
217        self.assertRaises(TypeError, os.rename, path, 0)
218        new = sys.getrefcount(path)
219        self.assertEqual(old, new)
220
221    def test_read(self):
222        with open(os_helper.TESTFN, "w+b") as fobj:
223            fobj.write(b"spam")
224            fobj.flush()
225            fd = fobj.fileno()
226            os.lseek(fd, 0, 0)
227            s = os.read(fd, 4)
228            self.assertEqual(type(s), bytes)
229            self.assertEqual(s, b"spam")
230
231    @support.cpython_only
232    # Skip the test on 32-bit platforms: the number of bytes must fit in a
233    # Py_ssize_t type
234    @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
235                         "needs INT_MAX < PY_SSIZE_T_MAX")
236    @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
237    def test_large_read(self, size):
238        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
239        create_file(os_helper.TESTFN, b'test')
240
241        # Issue #21932: Make sure that os.read() does not raise an
242        # OverflowError for size larger than INT_MAX
243        with open(os_helper.TESTFN, "rb") as fp:
244            data = os.read(fp.fileno(), size)
245
246        # The test does not try to read more than 2 GiB at once because the
247        # operating system is free to return less bytes than requested.
248        self.assertEqual(data, b'test')
249
250    def test_write(self):
251        # os.write() accepts bytes- and buffer-like objects but not strings
252        fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY)
253        self.assertRaises(TypeError, os.write, fd, "beans")
254        os.write(fd, b"bacon\n")
255        os.write(fd, bytearray(b"eggs\n"))
256        os.write(fd, memoryview(b"spam\n"))
257        os.close(fd)
258        with open(os_helper.TESTFN, "rb") as fobj:
259            self.assertEqual(fobj.read().splitlines(),
260                [b"bacon", b"eggs", b"spam"])
261
262    def write_windows_console(self, *args):
263        retcode = subprocess.call(args,
264            # use a new console to not flood the test output
265            creationflags=subprocess.CREATE_NEW_CONSOLE,
266            # use a shell to hide the console window (SW_HIDE)
267            shell=True)
268        self.assertEqual(retcode, 0)
269
270    @unittest.skipUnless(sys.platform == 'win32',
271                         'test specific to the Windows console')
272    def test_write_windows_console(self):
273        # Issue #11395: the Windows console returns an error (12: not enough
274        # space error) on writing into stdout if stdout mode is binary and the
275        # length is greater than 66,000 bytes (or less, depending on heap
276        # usage).
277        code = "print('x' * 100000)"
278        self.write_windows_console(sys.executable, "-c", code)
279        self.write_windows_console(sys.executable, "-u", "-c", code)
280
281    def fdopen_helper(self, *args):
282        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
283        f = os.fdopen(fd, *args, encoding="utf-8")
284        f.close()
285
286    def test_fdopen(self):
287        fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
288        os.close(fd)
289
290        self.fdopen_helper()
291        self.fdopen_helper('r')
292        self.fdopen_helper('r', 100)
293
294    def test_replace(self):
295        TESTFN2 = os_helper.TESTFN + ".2"
296        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
297        self.addCleanup(os_helper.unlink, TESTFN2)
298
299        create_file(os_helper.TESTFN, b"1")
300        create_file(TESTFN2, b"2")
301
302        os.replace(os_helper.TESTFN, TESTFN2)
303        self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN)
304        with open(TESTFN2, 'r', encoding='utf-8') as f:
305            self.assertEqual(f.read(), "1")
306
307    def test_open_keywords(self):
308        f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
309            dir_fd=None)
310        os.close(f)
311
312    def test_symlink_keywords(self):
313        symlink = support.get_attribute(os, "symlink")
314        try:
315            symlink(src='target', dst=os_helper.TESTFN,
316                target_is_directory=False, dir_fd=None)
317        except (NotImplementedError, OSError):
318            pass  # No OS support or unprivileged user
319
320    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
321    def test_copy_file_range_invalid_values(self):
322        with self.assertRaises(ValueError):
323            os.copy_file_range(0, 1, -10)
324
325    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
326    def test_copy_file_range(self):
327        TESTFN2 = os_helper.TESTFN + ".3"
328        data = b'0123456789'
329
330        create_file(os_helper.TESTFN, data)
331        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
332
333        in_file = open(os_helper.TESTFN, 'rb')
334        self.addCleanup(in_file.close)
335        in_fd = in_file.fileno()
336
337        out_file = open(TESTFN2, 'w+b')
338        self.addCleanup(os_helper.unlink, TESTFN2)
339        self.addCleanup(out_file.close)
340        out_fd = out_file.fileno()
341
342        try:
343            i = os.copy_file_range(in_fd, out_fd, 5)
344        except OSError as e:
345            # Handle the case in which Python was compiled
346            # in a system with the syscall but without support
347            # in the kernel.
348            if e.errno != errno.ENOSYS:
349                raise
350            self.skipTest(e)
351        else:
352            # The number of copied bytes can be less than
353            # the number of bytes originally requested.
354            self.assertIn(i, range(0, 6));
355
356            with open(TESTFN2, 'rb') as in_file:
357                self.assertEqual(in_file.read(), data[:i])
358
359    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
360    def test_copy_file_range_offset(self):
361        TESTFN4 = os_helper.TESTFN + ".4"
362        data = b'0123456789'
363        bytes_to_copy = 6
364        in_skip = 3
365        out_seek = 5
366
367        create_file(os_helper.TESTFN, data)
368        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
369
370        in_file = open(os_helper.TESTFN, 'rb')
371        self.addCleanup(in_file.close)
372        in_fd = in_file.fileno()
373
374        out_file = open(TESTFN4, 'w+b')
375        self.addCleanup(os_helper.unlink, TESTFN4)
376        self.addCleanup(out_file.close)
377        out_fd = out_file.fileno()
378
379        try:
380            i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
381                                   offset_src=in_skip,
382                                   offset_dst=out_seek)
383        except OSError as e:
384            # Handle the case in which Python was compiled
385            # in a system with the syscall but without support
386            # in the kernel.
387            if e.errno != errno.ENOSYS:
388                raise
389            self.skipTest(e)
390        else:
391            # The number of copied bytes can be less than
392            # the number of bytes originally requested.
393            self.assertIn(i, range(0, bytes_to_copy+1));
394
395            with open(TESTFN4, 'rb') as in_file:
396                read = in_file.read()
397            # seeked bytes (5) are zero'ed
398            self.assertEqual(read[:out_seek], b'\x00'*out_seek)
399            # 012 are skipped (in_skip)
400            # 345678 are copied in the file (in_skip + bytes_to_copy)
401            self.assertEqual(read[out_seek:],
402                             data[in_skip:in_skip+i])
403
404    @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
405    def test_splice_invalid_values(self):
406        with self.assertRaises(ValueError):
407            os.splice(0, 1, -10)
408
409    @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
410    @requires_splice_pipe
411    def test_splice(self):
412        TESTFN2 = os_helper.TESTFN + ".3"
413        data = b'0123456789'
414
415        create_file(os_helper.TESTFN, data)
416        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
417
418        in_file = open(os_helper.TESTFN, 'rb')
419        self.addCleanup(in_file.close)
420        in_fd = in_file.fileno()
421
422        read_fd, write_fd = os.pipe()
423        self.addCleanup(lambda: os.close(read_fd))
424        self.addCleanup(lambda: os.close(write_fd))
425
426        try:
427            i = os.splice(in_fd, write_fd, 5)
428        except OSError as e:
429            # Handle the case in which Python was compiled
430            # in a system with the syscall but without support
431            # in the kernel.
432            if e.errno != errno.ENOSYS:
433                raise
434            self.skipTest(e)
435        else:
436            # The number of copied bytes can be less than
437            # the number of bytes originally requested.
438            self.assertIn(i, range(0, 6));
439
440            self.assertEqual(os.read(read_fd, 100), data[:i])
441
442    @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
443    @requires_splice_pipe
444    def test_splice_offset_in(self):
445        TESTFN4 = os_helper.TESTFN + ".4"
446        data = b'0123456789'
447        bytes_to_copy = 6
448        in_skip = 3
449
450        create_file(os_helper.TESTFN, data)
451        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
452
453        in_file = open(os_helper.TESTFN, 'rb')
454        self.addCleanup(in_file.close)
455        in_fd = in_file.fileno()
456
457        read_fd, write_fd = os.pipe()
458        self.addCleanup(lambda: os.close(read_fd))
459        self.addCleanup(lambda: os.close(write_fd))
460
461        try:
462            i = os.splice(in_fd, write_fd, bytes_to_copy, offset_src=in_skip)
463        except OSError as e:
464            # Handle the case in which Python was compiled
465            # in a system with the syscall but without support
466            # in the kernel.
467            if e.errno != errno.ENOSYS:
468                raise
469            self.skipTest(e)
470        else:
471            # The number of copied bytes can be less than
472            # the number of bytes originally requested.
473            self.assertIn(i, range(0, bytes_to_copy+1));
474
475            read = os.read(read_fd, 100)
476            # 012 are skipped (in_skip)
477            # 345678 are copied in the file (in_skip + bytes_to_copy)
478            self.assertEqual(read, data[in_skip:in_skip+i])
479
480    @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
481    @requires_splice_pipe
482    def test_splice_offset_out(self):
483        TESTFN4 = os_helper.TESTFN + ".4"
484        data = b'0123456789'
485        bytes_to_copy = 6
486        out_seek = 3
487
488        create_file(os_helper.TESTFN, data)
489        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
490
491        read_fd, write_fd = os.pipe()
492        self.addCleanup(lambda: os.close(read_fd))
493        self.addCleanup(lambda: os.close(write_fd))
494        os.write(write_fd, data)
495
496        out_file = open(TESTFN4, 'w+b')
497        self.addCleanup(os_helper.unlink, TESTFN4)
498        self.addCleanup(out_file.close)
499        out_fd = out_file.fileno()
500
501        try:
502            i = os.splice(read_fd, out_fd, bytes_to_copy, offset_dst=out_seek)
503        except OSError as e:
504            # Handle the case in which Python was compiled
505            # in a system with the syscall but without support
506            # in the kernel.
507            if e.errno != errno.ENOSYS:
508                raise
509            self.skipTest(e)
510        else:
511            # The number of copied bytes can be less than
512            # the number of bytes originally requested.
513            self.assertIn(i, range(0, bytes_to_copy+1));
514
515            with open(TESTFN4, 'rb') as in_file:
516                read = in_file.read()
517            # seeked bytes (5) are zero'ed
518            self.assertEqual(read[:out_seek], b'\x00'*out_seek)
519            # 012 are skipped (in_skip)
520            # 345678 are copied in the file (in_skip + bytes_to_copy)
521            self.assertEqual(read[out_seek:], data[:i])
522
523
524# Test attributes on return values from os.*stat* family.
525class StatAttributeTests(unittest.TestCase):
526    def setUp(self):
527        self.fname = os_helper.TESTFN
528        self.addCleanup(os_helper.unlink, self.fname)
529        create_file(self.fname, b"ABC")
530
531    def check_stat_attributes(self, fname):
532        result = os.stat(fname)
533
534        # Make sure direct access works
535        self.assertEqual(result[stat.ST_SIZE], 3)
536        self.assertEqual(result.st_size, 3)
537
538        # Make sure all the attributes are there
539        members = dir(result)
540        for name in dir(stat):
541            if name[:3] == 'ST_':
542                attr = name.lower()
543                if name.endswith("TIME"):
544                    def trunc(x): return int(x)
545                else:
546                    def trunc(x): return x
547                self.assertEqual(trunc(getattr(result, attr)),
548                                  result[getattr(stat, name)])
549                self.assertIn(attr, members)
550
551        # Make sure that the st_?time and st_?time_ns fields roughly agree
552        # (they should always agree up to around tens-of-microseconds)
553        for name in 'st_atime st_mtime st_ctime'.split():
554            floaty = int(getattr(result, name) * 100000)
555            nanosecondy = getattr(result, name + "_ns") // 10000
556            self.assertAlmostEqual(floaty, nanosecondy, delta=2)
557
558        try:
559            result[200]
560            self.fail("No exception raised")
561        except IndexError:
562            pass
563
564        # Make sure that assignment fails
565        try:
566            result.st_mode = 1
567            self.fail("No exception raised")
568        except AttributeError:
569            pass
570
571        try:
572            result.st_rdev = 1
573            self.fail("No exception raised")
574        except (AttributeError, TypeError):
575            pass
576
577        try:
578            result.parrot = 1
579            self.fail("No exception raised")
580        except AttributeError:
581            pass
582
583        # Use the stat_result constructor with a too-short tuple.
584        try:
585            result2 = os.stat_result((10,))
586            self.fail("No exception raised")
587        except TypeError:
588            pass
589
590        # Use the constructor with a too-long tuple.
591        try:
592            result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
593        except TypeError:
594            pass
595
596    def test_stat_attributes(self):
597        self.check_stat_attributes(self.fname)
598
599    def test_stat_attributes_bytes(self):
600        try:
601            fname = self.fname.encode(sys.getfilesystemencoding())
602        except UnicodeEncodeError:
603            self.skipTest("cannot encode %a for the filesystem" % self.fname)
604        self.check_stat_attributes(fname)
605
606    def test_stat_result_pickle(self):
607        result = os.stat(self.fname)
608        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
609            p = pickle.dumps(result, proto)
610            self.assertIn(b'stat_result', p)
611            if proto < 4:
612                self.assertIn(b'cos\nstat_result\n', p)
613            unpickled = pickle.loads(p)
614            self.assertEqual(result, unpickled)
615
616    @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
617    def test_statvfs_attributes(self):
618        result = os.statvfs(self.fname)
619
620        # Make sure direct access works
621        self.assertEqual(result.f_bfree, result[3])
622
623        # Make sure all the attributes are there.
624        members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
625                    'ffree', 'favail', 'flag', 'namemax')
626        for value, member in enumerate(members):
627            self.assertEqual(getattr(result, 'f_' + member), result[value])
628
629        self.assertTrue(isinstance(result.f_fsid, int))
630
631        # Test that the size of the tuple doesn't change
632        self.assertEqual(len(result), 10)
633
634        # Make sure that assignment really fails
635        try:
636            result.f_bfree = 1
637            self.fail("No exception raised")
638        except AttributeError:
639            pass
640
641        try:
642            result.parrot = 1
643            self.fail("No exception raised")
644        except AttributeError:
645            pass
646
647        # Use the constructor with a too-short tuple.
648        try:
649            result2 = os.statvfs_result((10,))
650            self.fail("No exception raised")
651        except TypeError:
652            pass
653
654        # Use the constructor with a too-long tuple.
655        try:
656            result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
657        except TypeError:
658            pass
659
660    @unittest.skipUnless(hasattr(os, 'statvfs'),
661                         "need os.statvfs()")
662    def test_statvfs_result_pickle(self):
663        result = os.statvfs(self.fname)
664
665        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
666            p = pickle.dumps(result, proto)
667            self.assertIn(b'statvfs_result', p)
668            if proto < 4:
669                self.assertIn(b'cos\nstatvfs_result\n', p)
670            unpickled = pickle.loads(p)
671            self.assertEqual(result, unpickled)
672
673    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
674    def test_1686475(self):
675        # Verify that an open file can be stat'ed
676        try:
677            os.stat(r"c:\pagefile.sys")
678        except FileNotFoundError:
679            self.skipTest(r'c:\pagefile.sys does not exist')
680        except OSError as e:
681            self.fail("Could not stat pagefile.sys")
682
683    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
684    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
685    def test_15261(self):
686        # Verify that stat'ing a closed fd does not cause crash
687        r, w = os.pipe()
688        try:
689            os.stat(r)          # should not raise error
690        finally:
691            os.close(r)
692            os.close(w)
693        with self.assertRaises(OSError) as ctx:
694            os.stat(r)
695        self.assertEqual(ctx.exception.errno, errno.EBADF)
696
697    def check_file_attributes(self, result):
698        self.assertTrue(hasattr(result, 'st_file_attributes'))
699        self.assertTrue(isinstance(result.st_file_attributes, int))
700        self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
701
702    @unittest.skipUnless(sys.platform == "win32",
703                         "st_file_attributes is Win32 specific")
704    def test_file_attributes(self):
705        # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
706        result = os.stat(self.fname)
707        self.check_file_attributes(result)
708        self.assertEqual(
709            result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
710            0)
711
712        # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
713        dirname = os_helper.TESTFN + "dir"
714        os.mkdir(dirname)
715        self.addCleanup(os.rmdir, dirname)
716
717        result = os.stat(dirname)
718        self.check_file_attributes(result)
719        self.assertEqual(
720            result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
721            stat.FILE_ATTRIBUTE_DIRECTORY)
722
723    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
724    def test_access_denied(self):
725        # Default to FindFirstFile WIN32_FIND_DATA when access is
726        # denied. See issue 28075.
727        # os.environ['TEMP'] should be located on a volume that
728        # supports file ACLs.
729        fname = os.path.join(os.environ['TEMP'], self.fname)
730        self.addCleanup(os_helper.unlink, fname)
731        create_file(fname, b'ABC')
732        # Deny the right to [S]YNCHRONIZE on the file to
733        # force CreateFile to fail with ERROR_ACCESS_DENIED.
734        DETACHED_PROCESS = 8
735        subprocess.check_call(
736            # bpo-30584: Use security identifier *S-1-5-32-545 instead
737            # of localized "Users" to not depend on the locale.
738            ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
739            creationflags=DETACHED_PROCESS
740        )
741        result = os.stat(fname)
742        self.assertNotEqual(result.st_size, 0)
743
744    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
745    def test_stat_block_device(self):
746        # bpo-38030: os.stat fails for block devices
747        # Test a filename like "//./C:"
748        fname = "//./" + os.path.splitdrive(os.getcwd())[0]
749        result = os.stat(fname)
750        self.assertEqual(result.st_mode, stat.S_IFBLK)
751
752
753class UtimeTests(unittest.TestCase):
754    def setUp(self):
755        self.dirname = os_helper.TESTFN
756        self.fname = os.path.join(self.dirname, "f1")
757
758        self.addCleanup(os_helper.rmtree, self.dirname)
759        os.mkdir(self.dirname)
760        create_file(self.fname)
761
762    def support_subsecond(self, filename):
763        # Heuristic to check if the filesystem supports timestamp with
764        # subsecond resolution: check if float and int timestamps are different
765        st = os.stat(filename)
766        return ((st.st_atime != st[7])
767                or (st.st_mtime != st[8])
768                or (st.st_ctime != st[9]))
769
770    def _test_utime(self, set_time, filename=None):
771        if not filename:
772            filename = self.fname
773
774        support_subsecond = self.support_subsecond(filename)
775        if support_subsecond:
776            # Timestamp with a resolution of 1 microsecond (10^-6).
777            #
778            # The resolution of the C internal function used by os.utime()
779            # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
780            # test with a resolution of 1 ns requires more work:
781            # see the issue #15745.
782            atime_ns = 1002003000   # 1.002003 seconds
783            mtime_ns = 4005006000   # 4.005006 seconds
784        else:
785            # use a resolution of 1 second
786            atime_ns = 5 * 10**9
787            mtime_ns = 8 * 10**9
788
789        set_time(filename, (atime_ns, mtime_ns))
790        st = os.stat(filename)
791
792        if support_subsecond:
793            self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
794            self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
795        else:
796            self.assertEqual(st.st_atime, atime_ns * 1e-9)
797            self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
798        self.assertEqual(st.st_atime_ns, atime_ns)
799        self.assertEqual(st.st_mtime_ns, mtime_ns)
800
801    def test_utime(self):
802        def set_time(filename, ns):
803            # test the ns keyword parameter
804            os.utime(filename, ns=ns)
805        self._test_utime(set_time)
806
807    @staticmethod
808    def ns_to_sec(ns):
809        # Convert a number of nanosecond (int) to a number of seconds (float).
810        # Round towards infinity by adding 0.5 nanosecond to avoid rounding
811        # issue, os.utime() rounds towards minus infinity.
812        return (ns * 1e-9) + 0.5e-9
813
814    def test_utime_by_indexed(self):
815        # pass times as floating point seconds as the second indexed parameter
816        def set_time(filename, ns):
817            atime_ns, mtime_ns = ns
818            atime = self.ns_to_sec(atime_ns)
819            mtime = self.ns_to_sec(mtime_ns)
820            # test utimensat(timespec), utimes(timeval), utime(utimbuf)
821            # or utime(time_t)
822            os.utime(filename, (atime, mtime))
823        self._test_utime(set_time)
824
825    def test_utime_by_times(self):
826        def set_time(filename, ns):
827            atime_ns, mtime_ns = ns
828            atime = self.ns_to_sec(atime_ns)
829            mtime = self.ns_to_sec(mtime_ns)
830            # test the times keyword parameter
831            os.utime(filename, times=(atime, mtime))
832        self._test_utime(set_time)
833
834    @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
835                         "follow_symlinks support for utime required "
836                         "for this test.")
837    def test_utime_nofollow_symlinks(self):
838        def set_time(filename, ns):
839            # use follow_symlinks=False to test utimensat(timespec)
840            # or lutimes(timeval)
841            os.utime(filename, ns=ns, follow_symlinks=False)
842        self._test_utime(set_time)
843
844    @unittest.skipUnless(os.utime in os.supports_fd,
845                         "fd support for utime required for this test.")
846    def test_utime_fd(self):
847        def set_time(filename, ns):
848            with open(filename, 'wb', 0) as fp:
849                # use a file descriptor to test futimens(timespec)
850                # or futimes(timeval)
851                os.utime(fp.fileno(), ns=ns)
852        self._test_utime(set_time)
853
854    @unittest.skipUnless(os.utime in os.supports_dir_fd,
855                         "dir_fd support for utime required for this test.")
856    def test_utime_dir_fd(self):
857        def set_time(filename, ns):
858            dirname, name = os.path.split(filename)
859            with os_helper.open_dir_fd(dirname) as dirfd:
860                # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
861                os.utime(name, dir_fd=dirfd, ns=ns)
862        self._test_utime(set_time)
863
864    def test_utime_directory(self):
865        def set_time(filename, ns):
866            # test calling os.utime() on a directory
867            os.utime(filename, ns=ns)
868        self._test_utime(set_time, filename=self.dirname)
869
870    def _test_utime_current(self, set_time):
871        # Get the system clock
872        current = time.time()
873
874        # Call os.utime() to set the timestamp to the current system clock
875        set_time(self.fname)
876
877        if not self.support_subsecond(self.fname):
878            delta = 1.0
879        else:
880            # On Windows, the usual resolution of time.time() is 15.6 ms.
881            # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
882            #
883            # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
884            # also 50 ms on other platforms.
885            delta = 0.050
886        st = os.stat(self.fname)
887        msg = ("st_time=%r, current=%r, dt=%r"
888               % (st.st_mtime, current, st.st_mtime - current))
889        self.assertAlmostEqual(st.st_mtime, current,
890                               delta=delta, msg=msg)
891
892    def test_utime_current(self):
893        def set_time(filename):
894            # Set to the current time in the new way
895            os.utime(self.fname)
896        self._test_utime_current(set_time)
897
898    def test_utime_current_old(self):
899        def set_time(filename):
900            # Set to the current time in the old explicit way.
901            os.utime(self.fname, None)
902        self._test_utime_current(set_time)
903
904    def get_file_system(self, path):
905        if sys.platform == 'win32':
906            root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
907            import ctypes
908            kernel32 = ctypes.windll.kernel32
909            buf = ctypes.create_unicode_buffer("", 100)
910            ok = kernel32.GetVolumeInformationW(root, None, 0,
911                                                None, None, None,
912                                                buf, len(buf))
913            if ok:
914                return buf.value
915        # return None if the filesystem is unknown
916
917    def test_large_time(self):
918        # Many filesystems are limited to the year 2038. At least, the test
919        # pass with NTFS filesystem.
920        if self.get_file_system(self.dirname) != "NTFS":
921            self.skipTest("requires NTFS")
922
923        large = 5000000000   # some day in 2128
924        os.utime(self.fname, (large, large))
925        self.assertEqual(os.stat(self.fname).st_mtime, large)
926
927    def test_utime_invalid_arguments(self):
928        # seconds and nanoseconds parameters are mutually exclusive
929        with self.assertRaises(ValueError):
930            os.utime(self.fname, (5, 5), ns=(5, 5))
931        with self.assertRaises(TypeError):
932            os.utime(self.fname, [5, 5])
933        with self.assertRaises(TypeError):
934            os.utime(self.fname, (5,))
935        with self.assertRaises(TypeError):
936            os.utime(self.fname, (5, 5, 5))
937        with self.assertRaises(TypeError):
938            os.utime(self.fname, ns=[5, 5])
939        with self.assertRaises(TypeError):
940            os.utime(self.fname, ns=(5,))
941        with self.assertRaises(TypeError):
942            os.utime(self.fname, ns=(5, 5, 5))
943
944        if os.utime not in os.supports_follow_symlinks:
945            with self.assertRaises(NotImplementedError):
946                os.utime(self.fname, (5, 5), follow_symlinks=False)
947        if os.utime not in os.supports_fd:
948            with open(self.fname, 'wb', 0) as fp:
949                with self.assertRaises(TypeError):
950                    os.utime(fp.fileno(), (5, 5))
951        if os.utime not in os.supports_dir_fd:
952            with self.assertRaises(NotImplementedError):
953                os.utime(self.fname, (5, 5), dir_fd=0)
954
955    @support.cpython_only
956    def test_issue31577(self):
957        # The interpreter shouldn't crash in case utime() received a bad
958        # ns argument.
959        def get_bad_int(divmod_ret_val):
960            class BadInt:
961                def __divmod__(*args):
962                    return divmod_ret_val
963            return BadInt()
964        with self.assertRaises(TypeError):
965            os.utime(self.fname, ns=(get_bad_int(42), 1))
966        with self.assertRaises(TypeError):
967            os.utime(self.fname, ns=(get_bad_int(()), 1))
968        with self.assertRaises(TypeError):
969            os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
970
971
972from test import mapping_tests
973
974class EnvironTests(mapping_tests.BasicTestMappingProtocol):
975    """check that os.environ object conform to mapping protocol"""
976    type2test = None
977
978    def setUp(self):
979        self.__save = dict(os.environ)
980        if os.supports_bytes_environ:
981            self.__saveb = dict(os.environb)
982        for key, value in self._reference().items():
983            os.environ[key] = value
984
985    def tearDown(self):
986        os.environ.clear()
987        os.environ.update(self.__save)
988        if os.supports_bytes_environ:
989            os.environb.clear()
990            os.environb.update(self.__saveb)
991
992    def _reference(self):
993        return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
994
995    def _empty_mapping(self):
996        os.environ.clear()
997        return os.environ
998
999    # Bug 1110478
1000    @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
1001                         'requires a shell')
1002    @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
1003    @support.requires_subprocess()
1004    def test_update2(self):
1005        os.environ.clear()
1006        os.environ.update(HELLO="World")
1007        with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
1008            value = popen.read().strip()
1009            self.assertEqual(value, "World")
1010
1011    @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
1012                         'requires a shell')
1013    @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
1014    @support.requires_subprocess()
1015    def test_os_popen_iter(self):
1016        with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
1017                      % unix_shell) as popen:
1018            it = iter(popen)
1019            self.assertEqual(next(it), "line1\n")
1020            self.assertEqual(next(it), "line2\n")
1021            self.assertEqual(next(it), "line3\n")
1022            self.assertRaises(StopIteration, next, it)
1023
1024    # Verify environ keys and values from the OS are of the
1025    # correct str type.
1026    def test_keyvalue_types(self):
1027        for key, val in os.environ.items():
1028            self.assertEqual(type(key), str)
1029            self.assertEqual(type(val), str)
1030
1031    def test_items(self):
1032        for key, value in self._reference().items():
1033            self.assertEqual(os.environ.get(key), value)
1034
1035    # Issue 7310
1036    def test___repr__(self):
1037        """Check that the repr() of os.environ looks like environ({...})."""
1038        env = os.environ
1039        formatted_items = ", ".join(
1040            f"{key!r}: {value!r}"
1041            for key, value in env.items()
1042        )
1043        self.assertEqual(repr(env), f"environ({{{formatted_items}}})")
1044
1045    def test_get_exec_path(self):
1046        defpath_list = os.defpath.split(os.pathsep)
1047        test_path = ['/monty', '/python', '', '/flying/circus']
1048        test_env = {'PATH': os.pathsep.join(test_path)}
1049
1050        saved_environ = os.environ
1051        try:
1052            os.environ = dict(test_env)
1053            # Test that defaulting to os.environ works.
1054            self.assertSequenceEqual(test_path, os.get_exec_path())
1055            self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
1056        finally:
1057            os.environ = saved_environ
1058
1059        # No PATH environment variable
1060        self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
1061        # Empty PATH environment variable
1062        self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
1063        # Supplied PATH environment variable
1064        self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
1065
1066        if os.supports_bytes_environ:
1067            # env cannot contain 'PATH' and b'PATH' keys
1068            try:
1069                # ignore BytesWarning warning
1070                with warnings.catch_warnings(record=True):
1071                    mixed_env = {'PATH': '1', b'PATH': b'2'}
1072            except BytesWarning:
1073                # mixed_env cannot be created with python -bb
1074                pass
1075            else:
1076                self.assertRaises(ValueError, os.get_exec_path, mixed_env)
1077
1078            # bytes key and/or value
1079            self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
1080                ['abc'])
1081            self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
1082                ['abc'])
1083            self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
1084                ['abc'])
1085
1086    @unittest.skipUnless(os.supports_bytes_environ,
1087                         "os.environb required for this test.")
1088    def test_environb(self):
1089        # os.environ -> os.environb
1090        value = 'euro\u20ac'
1091        try:
1092            value_bytes = value.encode(sys.getfilesystemencoding(),
1093                                       'surrogateescape')
1094        except UnicodeEncodeError:
1095            msg = "U+20AC character is not encodable to %s" % (
1096                sys.getfilesystemencoding(),)
1097            self.skipTest(msg)
1098        os.environ['unicode'] = value
1099        self.assertEqual(os.environ['unicode'], value)
1100        self.assertEqual(os.environb[b'unicode'], value_bytes)
1101
1102        # os.environb -> os.environ
1103        value = b'\xff'
1104        os.environb[b'bytes'] = value
1105        self.assertEqual(os.environb[b'bytes'], value)
1106        value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
1107        self.assertEqual(os.environ['bytes'], value_str)
1108
1109    @support.requires_subprocess()
1110    def test_putenv_unsetenv(self):
1111        name = "PYTHONTESTVAR"
1112        value = "testvalue"
1113        code = f'import os; print(repr(os.environ.get({name!r})))'
1114
1115        with os_helper.EnvironmentVarGuard() as env:
1116            env.pop(name, None)
1117
1118            os.putenv(name, value)
1119            proc = subprocess.run([sys.executable, '-c', code], check=True,
1120                                  stdout=subprocess.PIPE, text=True)
1121            self.assertEqual(proc.stdout.rstrip(), repr(value))
1122
1123            os.unsetenv(name)
1124            proc = subprocess.run([sys.executable, '-c', code], check=True,
1125                                  stdout=subprocess.PIPE, text=True)
1126            self.assertEqual(proc.stdout.rstrip(), repr(None))
1127
1128    # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
1129    @support.requires_mac_ver(10, 6)
1130    def test_putenv_unsetenv_error(self):
1131        # Empty variable name is invalid.
1132        # "=" and null character are not allowed in a variable name.
1133        for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
1134            self.assertRaises((OSError, ValueError), os.putenv, name, "value")
1135            self.assertRaises((OSError, ValueError), os.unsetenv, name)
1136
1137        if sys.platform == "win32":
1138            # On Windows, an environment variable string ("name=value" string)
1139            # is limited to 32,767 characters
1140            longstr = 'x' * 32_768
1141            self.assertRaises(ValueError, os.putenv, longstr, "1")
1142            self.assertRaises(ValueError, os.putenv, "X", longstr)
1143            self.assertRaises(ValueError, os.unsetenv, longstr)
1144
1145    def test_key_type(self):
1146        missing = 'missingkey'
1147        self.assertNotIn(missing, os.environ)
1148
1149        with self.assertRaises(KeyError) as cm:
1150            os.environ[missing]
1151        self.assertIs(cm.exception.args[0], missing)
1152        self.assertTrue(cm.exception.__suppress_context__)
1153
1154        with self.assertRaises(KeyError) as cm:
1155            del os.environ[missing]
1156        self.assertIs(cm.exception.args[0], missing)
1157        self.assertTrue(cm.exception.__suppress_context__)
1158
1159    def _test_environ_iteration(self, collection):
1160        iterator = iter(collection)
1161        new_key = "__new_key__"
1162
1163        next(iterator)  # start iteration over os.environ.items
1164
1165        # add a new key in os.environ mapping
1166        os.environ[new_key] = "test_environ_iteration"
1167
1168        try:
1169            next(iterator)  # force iteration over modified mapping
1170            self.assertEqual(os.environ[new_key], "test_environ_iteration")
1171        finally:
1172            del os.environ[new_key]
1173
1174    def test_iter_error_when_changing_os_environ(self):
1175        self._test_environ_iteration(os.environ)
1176
1177    def test_iter_error_when_changing_os_environ_items(self):
1178        self._test_environ_iteration(os.environ.items())
1179
1180    def test_iter_error_when_changing_os_environ_values(self):
1181        self._test_environ_iteration(os.environ.values())
1182
1183    def _test_underlying_process_env(self, var, expected):
1184        if not (unix_shell and os.path.exists(unix_shell)):
1185            return
1186        elif not support.has_subprocess_support:
1187            return
1188
1189        with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1190            value = popen.read().strip()
1191
1192        self.assertEqual(expected, value)
1193
1194    def test_or_operator(self):
1195        overridden_key = '_TEST_VAR_'
1196        original_value = 'original_value'
1197        os.environ[overridden_key] = original_value
1198
1199        new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1200        expected = dict(os.environ)
1201        expected.update(new_vars_dict)
1202
1203        actual = os.environ | new_vars_dict
1204        self.assertDictEqual(expected, actual)
1205        self.assertEqual('3', actual[overridden_key])
1206
1207        new_vars_items = new_vars_dict.items()
1208        self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1209
1210        self._test_underlying_process_env('_A_', '')
1211        self._test_underlying_process_env(overridden_key, original_value)
1212
1213    def test_ior_operator(self):
1214        overridden_key = '_TEST_VAR_'
1215        os.environ[overridden_key] = 'original_value'
1216
1217        new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1218        expected = dict(os.environ)
1219        expected.update(new_vars_dict)
1220
1221        os.environ |= new_vars_dict
1222        self.assertEqual(expected, os.environ)
1223        self.assertEqual('3', os.environ[overridden_key])
1224
1225        self._test_underlying_process_env('_A_', '1')
1226        self._test_underlying_process_env(overridden_key, '3')
1227
1228    def test_ior_operator_invalid_dicts(self):
1229        os_environ_copy = os.environ.copy()
1230        with self.assertRaises(TypeError):
1231            dict_with_bad_key = {1: '_A_'}
1232            os.environ |= dict_with_bad_key
1233
1234        with self.assertRaises(TypeError):
1235            dict_with_bad_val = {'_A_': 1}
1236            os.environ |= dict_with_bad_val
1237
1238        # Check nothing was added.
1239        self.assertEqual(os_environ_copy, os.environ)
1240
1241    def test_ior_operator_key_value_iterable(self):
1242        overridden_key = '_TEST_VAR_'
1243        os.environ[overridden_key] = 'original_value'
1244
1245        new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1246        expected = dict(os.environ)
1247        expected.update(new_vars_items)
1248
1249        os.environ |= new_vars_items
1250        self.assertEqual(expected, os.environ)
1251        self.assertEqual('3', os.environ[overridden_key])
1252
1253        self._test_underlying_process_env('_A_', '1')
1254        self._test_underlying_process_env(overridden_key, '3')
1255
1256    def test_ror_operator(self):
1257        overridden_key = '_TEST_VAR_'
1258        original_value = 'original_value'
1259        os.environ[overridden_key] = original_value
1260
1261        new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1262        expected = dict(new_vars_dict)
1263        expected.update(os.environ)
1264
1265        actual = new_vars_dict | os.environ
1266        self.assertDictEqual(expected, actual)
1267        self.assertEqual(original_value, actual[overridden_key])
1268
1269        new_vars_items = new_vars_dict.items()
1270        self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1271
1272        self._test_underlying_process_env('_A_', '')
1273        self._test_underlying_process_env(overridden_key, original_value)
1274
1275
1276class WalkTests(unittest.TestCase):
1277    """Tests for os.walk()."""
1278
1279    # Wrapper to hide minor differences between os.walk and os.fwalk
1280    # to tests both functions with the same code base
1281    def walk(self, top, **kwargs):
1282        if 'follow_symlinks' in kwargs:
1283            kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1284        return os.walk(top, **kwargs)
1285
1286    def setUp(self):
1287        join = os.path.join
1288        self.addCleanup(os_helper.rmtree, os_helper.TESTFN)
1289
1290        # Build:
1291        #     TESTFN/
1292        #       TEST1/              a file kid and two directory kids
1293        #         tmp1
1294        #         SUB1/             a file kid and a directory kid
1295        #           tmp2
1296        #           SUB11/          no kids
1297        #         SUB2/             a file kid and a dirsymlink kid
1298        #           tmp3
1299        #           SUB21/          not readable
1300        #             tmp5
1301        #           link/           a symlink to TESTFN.2
1302        #           broken_link
1303        #           broken_link2
1304        #           broken_link3
1305        #       TEST2/
1306        #         tmp4              a lone file
1307        self.walk_path = join(os_helper.TESTFN, "TEST1")
1308        self.sub1_path = join(self.walk_path, "SUB1")
1309        self.sub11_path = join(self.sub1_path, "SUB11")
1310        sub2_path = join(self.walk_path, "SUB2")
1311        sub21_path = join(sub2_path, "SUB21")
1312        tmp1_path = join(self.walk_path, "tmp1")
1313        tmp2_path = join(self.sub1_path, "tmp2")
1314        tmp3_path = join(sub2_path, "tmp3")
1315        tmp5_path = join(sub21_path, "tmp3")
1316        self.link_path = join(sub2_path, "link")
1317        t2_path = join(os_helper.TESTFN, "TEST2")
1318        tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4")
1319        broken_link_path = join(sub2_path, "broken_link")
1320        broken_link2_path = join(sub2_path, "broken_link2")
1321        broken_link3_path = join(sub2_path, "broken_link3")
1322
1323        # Create stuff.
1324        os.makedirs(self.sub11_path)
1325        os.makedirs(sub2_path)
1326        os.makedirs(sub21_path)
1327        os.makedirs(t2_path)
1328
1329        for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
1330            with open(path, "x", encoding='utf-8') as f:
1331                f.write("I'm " + path + " and proud of it.  Blame test_os.\n")
1332
1333        if os_helper.can_symlink():
1334            os.symlink(os.path.abspath(t2_path), self.link_path)
1335            os.symlink('broken', broken_link_path, True)
1336            os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1337            os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
1338            self.sub2_tree = (sub2_path, ["SUB21", "link"],
1339                              ["broken_link", "broken_link2", "broken_link3",
1340                               "tmp3"])
1341        else:
1342            self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
1343
1344        if not support.is_emscripten:
1345            # Emscripten fails with inaccessible directory
1346            os.chmod(sub21_path, 0)
1347        try:
1348            os.listdir(sub21_path)
1349        except PermissionError:
1350            self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1351        else:
1352            os.chmod(sub21_path, stat.S_IRWXU)
1353            os.unlink(tmp5_path)
1354            os.rmdir(sub21_path)
1355            del self.sub2_tree[1][:1]
1356
1357    def test_walk_topdown(self):
1358        # Walk top-down.
1359        all = list(self.walk(self.walk_path))
1360
1361        self.assertEqual(len(all), 4)
1362        # We can't know which order SUB1 and SUB2 will appear in.
1363        # Not flipped:  TESTFN, SUB1, SUB11, SUB2
1364        #     flipped:  TESTFN, SUB2, SUB1, SUB11
1365        flipped = all[0][1][0] != "SUB1"
1366        all[0][1].sort()
1367        all[3 - 2 * flipped][-1].sort()
1368        all[3 - 2 * flipped][1].sort()
1369        self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1370        self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1371        self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1372        self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
1373
1374    def test_walk_prune(self, walk_path=None):
1375        if walk_path is None:
1376            walk_path = self.walk_path
1377        # Prune the search.
1378        all = []
1379        for root, dirs, files in self.walk(walk_path):
1380            all.append((root, dirs, files))
1381            # Don't descend into SUB1.
1382            if 'SUB1' in dirs:
1383                # Note that this also mutates the dirs we appended to all!
1384                dirs.remove('SUB1')
1385
1386        self.assertEqual(len(all), 2)
1387        self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
1388
1389        all[1][-1].sort()
1390        all[1][1].sort()
1391        self.assertEqual(all[1], self.sub2_tree)
1392
1393    def test_file_like_path(self):
1394        self.test_walk_prune(FakePath(self.walk_path))
1395
1396    def test_walk_bottom_up(self):
1397        # Walk bottom-up.
1398        all = list(self.walk(self.walk_path, topdown=False))
1399
1400        self.assertEqual(len(all), 4, all)
1401        # We can't know which order SUB1 and SUB2 will appear in.
1402        # Not flipped:  SUB11, SUB1, SUB2, TESTFN
1403        #     flipped:  SUB2, SUB11, SUB1, TESTFN
1404        flipped = all[3][1][0] != "SUB1"
1405        all[3][1].sort()
1406        all[2 - 2 * flipped][-1].sort()
1407        all[2 - 2 * flipped][1].sort()
1408        self.assertEqual(all[3],
1409                         (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1410        self.assertEqual(all[flipped],
1411                         (self.sub11_path, [], []))
1412        self.assertEqual(all[flipped + 1],
1413                         (self.sub1_path, ["SUB11"], ["tmp2"]))
1414        self.assertEqual(all[2 - 2 * flipped],
1415                         self.sub2_tree)
1416
1417    def test_walk_symlink(self):
1418        if not os_helper.can_symlink():
1419            self.skipTest("need symlink support")
1420
1421        # Walk, following symlinks.
1422        walk_it = self.walk(self.walk_path, follow_symlinks=True)
1423        for root, dirs, files in walk_it:
1424            if root == self.link_path:
1425                self.assertEqual(dirs, [])
1426                self.assertEqual(files, ["tmp4"])
1427                break
1428        else:
1429            self.fail("Didn't follow symlink with followlinks=True")
1430
1431    def test_walk_bad_dir(self):
1432        # Walk top-down.
1433        errors = []
1434        walk_it = self.walk(self.walk_path, onerror=errors.append)
1435        root, dirs, files = next(walk_it)
1436        self.assertEqual(errors, [])
1437        dir1 = 'SUB1'
1438        path1 = os.path.join(root, dir1)
1439        path1new = os.path.join(root, dir1 + '.new')
1440        os.rename(path1, path1new)
1441        try:
1442            roots = [r for r, d, f in walk_it]
1443            self.assertTrue(errors)
1444            self.assertNotIn(path1, roots)
1445            self.assertNotIn(path1new, roots)
1446            for dir2 in dirs:
1447                if dir2 != dir1:
1448                    self.assertIn(os.path.join(root, dir2), roots)
1449        finally:
1450            os.rename(path1new, path1)
1451
1452    def test_walk_many_open_files(self):
1453        depth = 30
1454        base = os.path.join(os_helper.TESTFN, 'deep')
1455        p = os.path.join(base, *(['d']*depth))
1456        os.makedirs(p)
1457
1458        iters = [self.walk(base, topdown=False) for j in range(100)]
1459        for i in range(depth + 1):
1460            expected = (p, ['d'] if i else [], [])
1461            for it in iters:
1462                self.assertEqual(next(it), expected)
1463            p = os.path.dirname(p)
1464
1465        iters = [self.walk(base, topdown=True) for j in range(100)]
1466        p = base
1467        for i in range(depth + 1):
1468            expected = (p, ['d'] if i < depth else [], [])
1469            for it in iters:
1470                self.assertEqual(next(it), expected)
1471            p = os.path.join(p, 'd')
1472
1473
1474@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1475class FwalkTests(WalkTests):
1476    """Tests for os.fwalk()."""
1477
1478    def walk(self, top, **kwargs):
1479        for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
1480            yield (root, dirs, files)
1481
1482    def fwalk(self, *args, **kwargs):
1483        return os.fwalk(*args, **kwargs)
1484
1485    def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1486        """
1487        compare with walk() results.
1488        """
1489        walk_kwargs = walk_kwargs.copy()
1490        fwalk_kwargs = fwalk_kwargs.copy()
1491        for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1492            walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1493            fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
1494
1495            expected = {}
1496            for root, dirs, files in os.walk(**walk_kwargs):
1497                expected[root] = (set(dirs), set(files))
1498
1499            for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
1500                self.assertIn(root, expected)
1501                self.assertEqual(expected[root], (set(dirs), set(files)))
1502
1503    def test_compare_to_walk(self):
1504        kwargs = {'top': os_helper.TESTFN}
1505        self._compare_to_walk(kwargs, kwargs)
1506
1507    def test_dir_fd(self):
1508        try:
1509            fd = os.open(".", os.O_RDONLY)
1510            walk_kwargs = {'top': os_helper.TESTFN}
1511            fwalk_kwargs = walk_kwargs.copy()
1512            fwalk_kwargs['dir_fd'] = fd
1513            self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1514        finally:
1515            os.close(fd)
1516
1517    def test_yields_correct_dir_fd(self):
1518        # check returned file descriptors
1519        for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1520            args = os_helper.TESTFN, topdown, None
1521            for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
1522                # check that the FD is valid
1523                os.fstat(rootfd)
1524                # redundant check
1525                os.stat(rootfd)
1526                # check that listdir() returns consistent information
1527                self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
1528
1529    @unittest.skipIf(
1530        support.is_emscripten, "Cannot dup stdout on Emscripten"
1531    )
1532    def test_fd_leak(self):
1533        # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1534        # we both check that calling fwalk() a large number of times doesn't
1535        # yield EMFILE, and that the minimum allocated FD hasn't changed.
1536        minfd = os.dup(1)
1537        os.close(minfd)
1538        for i in range(256):
1539            for x in self.fwalk(os_helper.TESTFN):
1540                pass
1541        newfd = os.dup(1)
1542        self.addCleanup(os.close, newfd)
1543        self.assertEqual(newfd, minfd)
1544
1545    # fwalk() keeps file descriptors open
1546    test_walk_many_open_files = None
1547
1548
1549class BytesWalkTests(WalkTests):
1550    """Tests for os.walk() with bytes."""
1551    def walk(self, top, **kwargs):
1552        if 'follow_symlinks' in kwargs:
1553            kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1554        for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1555            root = os.fsdecode(broot)
1556            dirs = list(map(os.fsdecode, bdirs))
1557            files = list(map(os.fsdecode, bfiles))
1558            yield (root, dirs, files)
1559            bdirs[:] = list(map(os.fsencode, dirs))
1560            bfiles[:] = list(map(os.fsencode, files))
1561
1562@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1563class BytesFwalkTests(FwalkTests):
1564    """Tests for os.walk() with bytes."""
1565    def fwalk(self, top='.', *args, **kwargs):
1566        for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1567            root = os.fsdecode(broot)
1568            dirs = list(map(os.fsdecode, bdirs))
1569            files = list(map(os.fsdecode, bfiles))
1570            yield (root, dirs, files, topfd)
1571            bdirs[:] = list(map(os.fsencode, dirs))
1572            bfiles[:] = list(map(os.fsencode, files))
1573
1574
1575class MakedirTests(unittest.TestCase):
1576    def setUp(self):
1577        os.mkdir(os_helper.TESTFN)
1578
1579    def test_makedir(self):
1580        base = os_helper.TESTFN
1581        path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1582        os.makedirs(path)             # Should work
1583        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1584        os.makedirs(path)
1585
1586        # Try paths with a '.' in them
1587        self.assertRaises(OSError, os.makedirs, os.curdir)
1588        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1589        os.makedirs(path)
1590        path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1591                            'dir5', 'dir6')
1592        os.makedirs(path)
1593
1594    @unittest.skipIf(
1595        support.is_emscripten or support.is_wasi,
1596        "Emscripten's/WASI's umask is a stub."
1597    )
1598    def test_mode(self):
1599        with os_helper.temp_umask(0o002):
1600            base = os_helper.TESTFN
1601            parent = os.path.join(base, 'dir1')
1602            path = os.path.join(parent, 'dir2')
1603            os.makedirs(path, 0o555)
1604            self.assertTrue(os.path.exists(path))
1605            self.assertTrue(os.path.isdir(path))
1606            if os.name != 'nt':
1607                self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1608                self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
1609
1610    @unittest.skipIf(
1611        support.is_emscripten or support.is_wasi,
1612        "Emscripten's/WASI's umask is a stub."
1613    )
1614    def test_exist_ok_existing_directory(self):
1615        path = os.path.join(os_helper.TESTFN, 'dir1')
1616        mode = 0o777
1617        old_mask = os.umask(0o022)
1618        os.makedirs(path, mode)
1619        self.assertRaises(OSError, os.makedirs, path, mode)
1620        self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
1621        os.makedirs(path, 0o776, exist_ok=True)
1622        os.makedirs(path, mode=mode, exist_ok=True)
1623        os.umask(old_mask)
1624
1625        # Issue #25583: A drive root could raise PermissionError on Windows
1626        os.makedirs(os.path.abspath('/'), exist_ok=True)
1627
1628    @unittest.skipIf(
1629        support.is_emscripten or support.is_wasi,
1630        "Emscripten's/WASI's umask is a stub."
1631    )
1632    def test_exist_ok_s_isgid_directory(self):
1633        path = os.path.join(os_helper.TESTFN, 'dir1')
1634        S_ISGID = stat.S_ISGID
1635        mode = 0o777
1636        old_mask = os.umask(0o022)
1637        try:
1638            existing_testfn_mode = stat.S_IMODE(
1639                    os.lstat(os_helper.TESTFN).st_mode)
1640            try:
1641                os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID)
1642            except PermissionError:
1643                raise unittest.SkipTest('Cannot set S_ISGID for dir.')
1644            if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID):
1645                raise unittest.SkipTest('No support for S_ISGID dir mode.')
1646            # The os should apply S_ISGID from the parent dir for us, but
1647            # this test need not depend on that behavior.  Be explicit.
1648            os.makedirs(path, mode | S_ISGID)
1649            # http://bugs.python.org/issue14992
1650            # Should not fail when the bit is already set.
1651            os.makedirs(path, mode, exist_ok=True)
1652            # remove the bit.
1653            os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
1654            # May work even when the bit is not already set when demanded.
1655            os.makedirs(path, mode | S_ISGID, exist_ok=True)
1656        finally:
1657            os.umask(old_mask)
1658
1659    def test_exist_ok_existing_regular_file(self):
1660        base = os_helper.TESTFN
1661        path = os.path.join(os_helper.TESTFN, 'dir1')
1662        with open(path, 'w', encoding='utf-8') as f:
1663            f.write('abc')
1664        self.assertRaises(OSError, os.makedirs, path)
1665        self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1666        self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1667        os.remove(path)
1668
1669    def tearDown(self):
1670        path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3',
1671                            'dir4', 'dir5', 'dir6')
1672        # If the tests failed, the bottom-most directory ('../dir6')
1673        # may not have been created, so we look for the outermost directory
1674        # that exists.
1675        while not os.path.exists(path) and path != os_helper.TESTFN:
1676            path = os.path.dirname(path)
1677
1678        os.removedirs(path)
1679
1680
1681@os_helper.skip_unless_working_chmod
1682class ChownFileTests(unittest.TestCase):
1683
1684    @classmethod
1685    def setUpClass(cls):
1686        os.mkdir(os_helper.TESTFN)
1687
1688    def test_chown_uid_gid_arguments_must_be_index(self):
1689        stat = os.stat(os_helper.TESTFN)
1690        uid = stat.st_uid
1691        gid = stat.st_gid
1692        for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1693            self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid)
1694            self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value)
1695        self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid))
1696        self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1))
1697
1698    @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1699    def test_chown_gid(self):
1700        groups = os.getgroups()
1701        if len(groups) < 2:
1702            self.skipTest("test needs at least 2 groups")
1703
1704        gid_1, gid_2 = groups[:2]
1705        uid = os.stat(os_helper.TESTFN).st_uid
1706
1707        os.chown(os_helper.TESTFN, uid, gid_1)
1708        gid = os.stat(os_helper.TESTFN).st_gid
1709        self.assertEqual(gid, gid_1)
1710
1711        os.chown(os_helper.TESTFN, uid, gid_2)
1712        gid = os.stat(os_helper.TESTFN).st_gid
1713        self.assertEqual(gid, gid_2)
1714
1715    @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1716                         "test needs root privilege and more than one user")
1717    def test_chown_with_root(self):
1718        uid_1, uid_2 = all_users[:2]
1719        gid = os.stat(os_helper.TESTFN).st_gid
1720        os.chown(os_helper.TESTFN, uid_1, gid)
1721        uid = os.stat(os_helper.TESTFN).st_uid
1722        self.assertEqual(uid, uid_1)
1723        os.chown(os_helper.TESTFN, uid_2, gid)
1724        uid = os.stat(os_helper.TESTFN).st_uid
1725        self.assertEqual(uid, uid_2)
1726
1727    @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1728                         "test needs non-root account and more than one user")
1729    def test_chown_without_permission(self):
1730        uid_1, uid_2 = all_users[:2]
1731        gid = os.stat(os_helper.TESTFN).st_gid
1732        with self.assertRaises(PermissionError):
1733            os.chown(os_helper.TESTFN, uid_1, gid)
1734            os.chown(os_helper.TESTFN, uid_2, gid)
1735
1736    @classmethod
1737    def tearDownClass(cls):
1738        os.rmdir(os_helper.TESTFN)
1739
1740
1741class RemoveDirsTests(unittest.TestCase):
1742    def setUp(self):
1743        os.makedirs(os_helper.TESTFN)
1744
1745    def tearDown(self):
1746        os_helper.rmtree(os_helper.TESTFN)
1747
1748    def test_remove_all(self):
1749        dira = os.path.join(os_helper.TESTFN, 'dira')
1750        os.mkdir(dira)
1751        dirb = os.path.join(dira, 'dirb')
1752        os.mkdir(dirb)
1753        os.removedirs(dirb)
1754        self.assertFalse(os.path.exists(dirb))
1755        self.assertFalse(os.path.exists(dira))
1756        self.assertFalse(os.path.exists(os_helper.TESTFN))
1757
1758    def test_remove_partial(self):
1759        dira = os.path.join(os_helper.TESTFN, 'dira')
1760        os.mkdir(dira)
1761        dirb = os.path.join(dira, 'dirb')
1762        os.mkdir(dirb)
1763        create_file(os.path.join(dira, 'file.txt'))
1764        os.removedirs(dirb)
1765        self.assertFalse(os.path.exists(dirb))
1766        self.assertTrue(os.path.exists(dira))
1767        self.assertTrue(os.path.exists(os_helper.TESTFN))
1768
1769    def test_remove_nothing(self):
1770        dira = os.path.join(os_helper.TESTFN, 'dira')
1771        os.mkdir(dira)
1772        dirb = os.path.join(dira, 'dirb')
1773        os.mkdir(dirb)
1774        create_file(os.path.join(dirb, 'file.txt'))
1775        with self.assertRaises(OSError):
1776            os.removedirs(dirb)
1777        self.assertTrue(os.path.exists(dirb))
1778        self.assertTrue(os.path.exists(dira))
1779        self.assertTrue(os.path.exists(os_helper.TESTFN))
1780
1781
1782@unittest.skipIf(support.is_wasi, "WASI has no /dev/null")
1783class DevNullTests(unittest.TestCase):
1784    def test_devnull(self):
1785        with open(os.devnull, 'wb', 0) as f:
1786            f.write(b'hello')
1787            f.close()
1788        with open(os.devnull, 'rb') as f:
1789            self.assertEqual(f.read(), b'')
1790
1791
1792class URandomTests(unittest.TestCase):
1793    def test_urandom_length(self):
1794        self.assertEqual(len(os.urandom(0)), 0)
1795        self.assertEqual(len(os.urandom(1)), 1)
1796        self.assertEqual(len(os.urandom(10)), 10)
1797        self.assertEqual(len(os.urandom(100)), 100)
1798        self.assertEqual(len(os.urandom(1000)), 1000)
1799
1800    def test_urandom_value(self):
1801        data1 = os.urandom(16)
1802        self.assertIsInstance(data1, bytes)
1803        data2 = os.urandom(16)
1804        self.assertNotEqual(data1, data2)
1805
1806    def get_urandom_subprocess(self, count):
1807        code = '\n'.join((
1808            'import os, sys',
1809            'data = os.urandom(%s)' % count,
1810            'sys.stdout.buffer.write(data)',
1811            'sys.stdout.buffer.flush()'))
1812        out = assert_python_ok('-c', code)
1813        stdout = out[1]
1814        self.assertEqual(len(stdout), count)
1815        return stdout
1816
1817    def test_urandom_subprocess(self):
1818        data1 = self.get_urandom_subprocess(16)
1819        data2 = self.get_urandom_subprocess(16)
1820        self.assertNotEqual(data1, data2)
1821
1822
1823@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1824class GetRandomTests(unittest.TestCase):
1825    @classmethod
1826    def setUpClass(cls):
1827        try:
1828            os.getrandom(1)
1829        except OSError as exc:
1830            if exc.errno == errno.ENOSYS:
1831                # Python compiled on a more recent Linux version
1832                # than the current Linux kernel
1833                raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1834            else:
1835                raise
1836
1837    def test_getrandom_type(self):
1838        data = os.getrandom(16)
1839        self.assertIsInstance(data, bytes)
1840        self.assertEqual(len(data), 16)
1841
1842    def test_getrandom0(self):
1843        empty = os.getrandom(0)
1844        self.assertEqual(empty, b'')
1845
1846    def test_getrandom_random(self):
1847        self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1848
1849        # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1850        # resource /dev/random
1851
1852    def test_getrandom_nonblock(self):
1853        # The call must not fail. Check also that the flag exists
1854        try:
1855            os.getrandom(1, os.GRND_NONBLOCK)
1856        except BlockingIOError:
1857            # System urandom is not initialized yet
1858            pass
1859
1860    def test_getrandom_value(self):
1861        data1 = os.getrandom(16)
1862        data2 = os.getrandom(16)
1863        self.assertNotEqual(data1, data2)
1864
1865
1866# os.urandom() doesn't use a file descriptor when it is implemented with the
1867# getentropy() function, the getrandom() function or the getrandom() syscall
1868OS_URANDOM_DONT_USE_FD = (
1869    sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1870    or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1871    or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
1872
1873@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1874                 "os.random() does not use a file descriptor")
1875@unittest.skipIf(sys.platform == "vxworks",
1876                 "VxWorks can't set RLIMIT_NOFILE to 1")
1877class URandomFDTests(unittest.TestCase):
1878    @unittest.skipUnless(resource, "test requires the resource module")
1879    def test_urandom_failure(self):
1880        # Check urandom() failing when it is not able to open /dev/random.
1881        # We spawn a new process to make the test more robust (if getrlimit()
1882        # failed to restore the file descriptor limit after this, the whole
1883        # test suite would crash; this actually happened on the OS X Tiger
1884        # buildbot).
1885        code = """if 1:
1886            import errno
1887            import os
1888            import resource
1889
1890            soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1891            resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1892            try:
1893                os.urandom(16)
1894            except OSError as e:
1895                assert e.errno == errno.EMFILE, e.errno
1896            else:
1897                raise AssertionError("OSError not raised")
1898            """
1899        assert_python_ok('-c', code)
1900
1901    def test_urandom_fd_closed(self):
1902        # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1903        # closed.
1904        code = """if 1:
1905            import os
1906            import sys
1907            import test.support
1908            os.urandom(4)
1909            with test.support.SuppressCrashReport():
1910                os.closerange(3, 256)
1911            sys.stdout.buffer.write(os.urandom(4))
1912            """
1913        rc, out, err = assert_python_ok('-Sc', code)
1914
1915    def test_urandom_fd_reopened(self):
1916        # Issue #21207: urandom() should detect its fd to /dev/urandom
1917        # changed to something else, and reopen it.
1918        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1919        create_file(os_helper.TESTFN, b"x" * 256)
1920
1921        code = """if 1:
1922            import os
1923            import sys
1924            import test.support
1925            os.urandom(4)
1926            with test.support.SuppressCrashReport():
1927                for fd in range(3, 256):
1928                    try:
1929                        os.close(fd)
1930                    except OSError:
1931                        pass
1932                    else:
1933                        # Found the urandom fd (XXX hopefully)
1934                        break
1935                os.closerange(3, 256)
1936            with open({TESTFN!r}, 'rb') as f:
1937                new_fd = f.fileno()
1938                # Issue #26935: posix allows new_fd and fd to be equal but
1939                # some libc implementations have dup2 return an error in this
1940                # case.
1941                if new_fd != fd:
1942                    os.dup2(new_fd, fd)
1943                sys.stdout.buffer.write(os.urandom(4))
1944                sys.stdout.buffer.write(os.urandom(4))
1945            """.format(TESTFN=os_helper.TESTFN)
1946        rc, out, err = assert_python_ok('-Sc', code)
1947        self.assertEqual(len(out), 8)
1948        self.assertNotEqual(out[0:4], out[4:8])
1949        rc, out2, err2 = assert_python_ok('-Sc', code)
1950        self.assertEqual(len(out2), 8)
1951        self.assertNotEqual(out2, out)
1952
1953
1954@contextlib.contextmanager
1955def _execvpe_mockup(defpath=None):
1956    """
1957    Stubs out execv and execve functions when used as context manager.
1958    Records exec calls. The mock execv and execve functions always raise an
1959    exception as they would normally never return.
1960    """
1961    # A list of tuples containing (function name, first arg, args)
1962    # of calls to execv or execve that have been made.
1963    calls = []
1964
1965    def mock_execv(name, *args):
1966        calls.append(('execv', name, args))
1967        raise RuntimeError("execv called")
1968
1969    def mock_execve(name, *args):
1970        calls.append(('execve', name, args))
1971        raise OSError(errno.ENOTDIR, "execve called")
1972
1973    try:
1974        orig_execv = os.execv
1975        orig_execve = os.execve
1976        orig_defpath = os.defpath
1977        os.execv = mock_execv
1978        os.execve = mock_execve
1979        if defpath is not None:
1980            os.defpath = defpath
1981        yield calls
1982    finally:
1983        os.execv = orig_execv
1984        os.execve = orig_execve
1985        os.defpath = orig_defpath
1986
1987@unittest.skipUnless(hasattr(os, 'execv'),
1988                     "need os.execv()")
1989class ExecTests(unittest.TestCase):
1990    @unittest.skipIf(USING_LINUXTHREADS,
1991                     "avoid triggering a linuxthreads bug: see issue #4970")
1992    def test_execvpe_with_bad_program(self):
1993        self.assertRaises(OSError, os.execvpe, 'no such app-',
1994                          ['no such app-'], None)
1995
1996    def test_execv_with_bad_arglist(self):
1997        self.assertRaises(ValueError, os.execv, 'notepad', ())
1998        self.assertRaises(ValueError, os.execv, 'notepad', [])
1999        self.assertRaises(ValueError, os.execv, 'notepad', ('',))
2000        self.assertRaises(ValueError, os.execv, 'notepad', [''])
2001
2002    def test_execvpe_with_bad_arglist(self):
2003        self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
2004        self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
2005        self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
2006
2007    @unittest.skipUnless(hasattr(os, '_execvpe'),
2008                         "No internal os._execvpe function to test.")
2009    def _test_internal_execvpe(self, test_type):
2010        program_path = os.sep + 'absolutepath'
2011        if test_type is bytes:
2012            program = b'executable'
2013            fullpath = os.path.join(os.fsencode(program_path), program)
2014            native_fullpath = fullpath
2015            arguments = [b'progname', 'arg1', 'arg2']
2016        else:
2017            program = 'executable'
2018            arguments = ['progname', 'arg1', 'arg2']
2019            fullpath = os.path.join(program_path, program)
2020            if os.name != "nt":
2021                native_fullpath = os.fsencode(fullpath)
2022            else:
2023                native_fullpath = fullpath
2024        env = {'spam': 'beans'}
2025
2026        # test os._execvpe() with an absolute path
2027        with _execvpe_mockup() as calls:
2028            self.assertRaises(RuntimeError,
2029                os._execvpe, fullpath, arguments)
2030            self.assertEqual(len(calls), 1)
2031            self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
2032
2033        # test os._execvpe() with a relative path:
2034        # os.get_exec_path() returns defpath
2035        with _execvpe_mockup(defpath=program_path) as calls:
2036            self.assertRaises(OSError,
2037                os._execvpe, program, arguments, env=env)
2038            self.assertEqual(len(calls), 1)
2039            self.assertSequenceEqual(calls[0],
2040                ('execve', native_fullpath, (arguments, env)))
2041
2042        # test os._execvpe() with a relative path:
2043        # os.get_exec_path() reads the 'PATH' variable
2044        with _execvpe_mockup() as calls:
2045            env_path = env.copy()
2046            if test_type is bytes:
2047                env_path[b'PATH'] = program_path
2048            else:
2049                env_path['PATH'] = program_path
2050            self.assertRaises(OSError,
2051                os._execvpe, program, arguments, env=env_path)
2052            self.assertEqual(len(calls), 1)
2053            self.assertSequenceEqual(calls[0],
2054                ('execve', native_fullpath, (arguments, env_path)))
2055
2056    def test_internal_execvpe_str(self):
2057        self._test_internal_execvpe(str)
2058        if os.name != "nt":
2059            self._test_internal_execvpe(bytes)
2060
2061    def test_execve_invalid_env(self):
2062        args = [sys.executable, '-c', 'pass']
2063
2064        # null character in the environment variable name
2065        newenv = os.environ.copy()
2066        newenv["FRUIT\0VEGETABLE"] = "cabbage"
2067        with self.assertRaises(ValueError):
2068            os.execve(args[0], args, newenv)
2069
2070        # null character in the environment variable value
2071        newenv = os.environ.copy()
2072        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2073        with self.assertRaises(ValueError):
2074            os.execve(args[0], args, newenv)
2075
2076        # equal character in the environment variable name
2077        newenv = os.environ.copy()
2078        newenv["FRUIT=ORANGE"] = "lemon"
2079        with self.assertRaises(ValueError):
2080            os.execve(args[0], args, newenv)
2081
2082    @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
2083    def test_execve_with_empty_path(self):
2084        # bpo-32890: Check GetLastError() misuse
2085        try:
2086            os.execve('', ['arg'], {})
2087        except OSError as e:
2088            self.assertTrue(e.winerror is None or e.winerror != 0)
2089        else:
2090            self.fail('No OSError raised')
2091
2092
2093@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2094class Win32ErrorTests(unittest.TestCase):
2095    def setUp(self):
2096        try:
2097            os.stat(os_helper.TESTFN)
2098        except FileNotFoundError:
2099            exists = False
2100        except OSError as exc:
2101            exists = True
2102            self.fail("file %s must not exist; os.stat failed with %s"
2103                      % (os_helper.TESTFN, exc))
2104        else:
2105            self.fail("file %s must not exist" % os_helper.TESTFN)
2106
2107    def test_rename(self):
2108        self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
2109
2110    def test_remove(self):
2111        self.assertRaises(OSError, os.remove, os_helper.TESTFN)
2112
2113    def test_chdir(self):
2114        self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
2115
2116    def test_mkdir(self):
2117        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
2118
2119        with open(os_helper.TESTFN, "x") as f:
2120            self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
2121
2122    def test_utime(self):
2123        self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
2124
2125    def test_chmod(self):
2126        self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
2127
2128
2129@unittest.skipIf(support.is_wasi, "Cannot create invalid FD on WASI.")
2130class TestInvalidFD(unittest.TestCase):
2131    singles = ["fchdir", "dup", "fdatasync", "fstat",
2132               "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
2133    #singles.append("close")
2134    #We omit close because it doesn't raise an exception on some platforms
2135    def get_single(f):
2136        def helper(self):
2137            if  hasattr(os, f):
2138                self.check(getattr(os, f))
2139        return helper
2140    for f in singles:
2141        locals()["test_"+f] = get_single(f)
2142
2143    def check(self, f, *args, **kwargs):
2144        try:
2145            f(os_helper.make_bad_fd(), *args, **kwargs)
2146        except OSError as e:
2147            self.assertEqual(e.errno, errno.EBADF)
2148        else:
2149            self.fail("%r didn't raise an OSError with a bad file descriptor"
2150                      % f)
2151
2152    def test_fdopen(self):
2153        self.check(os.fdopen, encoding="utf-8")
2154
2155    @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
2156    def test_isatty(self):
2157        self.assertEqual(os.isatty(os_helper.make_bad_fd()), False)
2158
2159    @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
2160    def test_closerange(self):
2161        fd = os_helper.make_bad_fd()
2162        # Make sure none of the descriptors we are about to close are
2163        # currently valid (issue 6542).
2164        for i in range(10):
2165            try: os.fstat(fd+i)
2166            except OSError:
2167                pass
2168            else:
2169                break
2170        if i < 2:
2171            raise unittest.SkipTest(
2172                "Unable to acquire a range of invalid file descriptors")
2173        self.assertEqual(os.closerange(fd, fd + i-1), None)
2174
2175    @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
2176    def test_dup2(self):
2177        self.check(os.dup2, 20)
2178
2179    @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
2180    @unittest.skipIf(
2181        support.is_emscripten,
2182        "dup2() with negative fds is broken on Emscripten (see gh-102179)"
2183    )
2184    def test_dup2_negative_fd(self):
2185        valid_fd = os.open(__file__, os.O_RDONLY)
2186        self.addCleanup(os.close, valid_fd)
2187        fds = [
2188            valid_fd,
2189            -1,
2190            -2**31,
2191        ]
2192        for fd, fd2 in itertools.product(fds, repeat=2):
2193            if fd != fd2:
2194                with self.subTest(fd=fd, fd2=fd2):
2195                    with self.assertRaises(OSError) as ctx:
2196                        os.dup2(fd, fd2)
2197                    self.assertEqual(ctx.exception.errno, errno.EBADF)
2198
2199    @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
2200    def test_fchmod(self):
2201        self.check(os.fchmod, 0)
2202
2203    @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
2204    def test_fchown(self):
2205        self.check(os.fchown, -1, -1)
2206
2207    @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
2208    @unittest.skipIf(
2209        support.is_emscripten or support.is_wasi,
2210        "musl libc issue on Emscripten/WASI, bpo-46390"
2211    )
2212    def test_fpathconf(self):
2213        self.check(os.pathconf, "PC_NAME_MAX")
2214        self.check(os.fpathconf, "PC_NAME_MAX")
2215
2216    @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
2217    def test_ftruncate(self):
2218        self.check(os.truncate, 0)
2219        self.check(os.ftruncate, 0)
2220
2221    @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
2222    def test_lseek(self):
2223        self.check(os.lseek, 0, 0)
2224
2225    @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
2226    def test_read(self):
2227        self.check(os.read, 1)
2228
2229    @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2230    def test_readv(self):
2231        buf = bytearray(10)
2232        self.check(os.readv, [buf])
2233
2234    @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
2235    def test_tcsetpgrpt(self):
2236        self.check(os.tcsetpgrp, 0)
2237
2238    @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
2239    def test_write(self):
2240        self.check(os.write, b" ")
2241
2242    @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2243    def test_writev(self):
2244        self.check(os.writev, [b'abc'])
2245
2246    @support.requires_subprocess()
2247    def test_inheritable(self):
2248        self.check(os.get_inheritable)
2249        self.check(os.set_inheritable, True)
2250
2251    @unittest.skipUnless(hasattr(os, 'get_blocking'),
2252                         'needs os.get_blocking() and os.set_blocking()')
2253    def test_blocking(self):
2254        self.check(os.get_blocking)
2255        self.check(os.set_blocking, True)
2256
2257
2258@unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
2259class LinkTests(unittest.TestCase):
2260    def setUp(self):
2261        self.file1 = os_helper.TESTFN
2262        self.file2 = os.path.join(os_helper.TESTFN + "2")
2263
2264    def tearDown(self):
2265        for file in (self.file1, self.file2):
2266            if os.path.exists(file):
2267                os.unlink(file)
2268
2269    def _test_link(self, file1, file2):
2270        create_file(file1)
2271
2272        try:
2273            os.link(file1, file2)
2274        except PermissionError as e:
2275            self.skipTest('os.link(): %s' % e)
2276        with open(file1, "rb") as f1, open(file2, "rb") as f2:
2277            self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2278
2279    def test_link(self):
2280        self._test_link(self.file1, self.file2)
2281
2282    def test_link_bytes(self):
2283        self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2284                        bytes(self.file2, sys.getfilesystemencoding()))
2285
2286    def test_unicode_name(self):
2287        try:
2288            os.fsencode("\xf1")
2289        except UnicodeError:
2290            raise unittest.SkipTest("Unable to encode for this platform.")
2291
2292        self.file1 += "\xf1"
2293        self.file2 = self.file1 + "2"
2294        self._test_link(self.file1, self.file2)
2295
2296@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2297class PosixUidGidTests(unittest.TestCase):
2298    # uid_t and gid_t are 32-bit unsigned integers on Linux
2299    UID_OVERFLOW = (1 << 32)
2300    GID_OVERFLOW = (1 << 32)
2301
2302    @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2303    def test_setuid(self):
2304        if os.getuid() != 0:
2305            self.assertRaises(OSError, os.setuid, 0)
2306        self.assertRaises(TypeError, os.setuid, 'not an int')
2307        self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
2308
2309    @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2310    def test_setgid(self):
2311        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2312            self.assertRaises(OSError, os.setgid, 0)
2313        self.assertRaises(TypeError, os.setgid, 'not an int')
2314        self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
2315
2316    @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2317    def test_seteuid(self):
2318        if os.getuid() != 0:
2319            self.assertRaises(OSError, os.seteuid, 0)
2320        self.assertRaises(TypeError, os.setegid, 'not an int')
2321        self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
2322
2323    @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2324    def test_setegid(self):
2325        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2326            self.assertRaises(OSError, os.setegid, 0)
2327        self.assertRaises(TypeError, os.setegid, 'not an int')
2328        self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
2329
2330    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2331    def test_setreuid(self):
2332        if os.getuid() != 0:
2333            self.assertRaises(OSError, os.setreuid, 0, 0)
2334        self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2335        self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2336        self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2337        self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
2338
2339    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2340    @support.requires_subprocess()
2341    def test_setreuid_neg1(self):
2342        # Needs to accept -1.  We run this in a subprocess to avoid
2343        # altering the test runner's process state (issue8045).
2344        subprocess.check_call([
2345                sys.executable, '-c',
2346                'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
2347
2348    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2349    @support.requires_subprocess()
2350    def test_setregid(self):
2351        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2352            self.assertRaises(OSError, os.setregid, 0, 0)
2353        self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2354        self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2355        self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2356        self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
2357
2358    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2359    @support.requires_subprocess()
2360    def test_setregid_neg1(self):
2361        # Needs to accept -1.  We run this in a subprocess to avoid
2362        # altering the test runner's process state (issue8045).
2363        subprocess.check_call([
2364                sys.executable, '-c',
2365                'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
2366
2367@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2368class Pep383Tests(unittest.TestCase):
2369    def setUp(self):
2370        if os_helper.TESTFN_UNENCODABLE:
2371            self.dir = os_helper.TESTFN_UNENCODABLE
2372        elif os_helper.TESTFN_NONASCII:
2373            self.dir = os_helper.TESTFN_NONASCII
2374        else:
2375            self.dir = os_helper.TESTFN
2376        self.bdir = os.fsencode(self.dir)
2377
2378        bytesfn = []
2379        def add_filename(fn):
2380            try:
2381                fn = os.fsencode(fn)
2382            except UnicodeEncodeError:
2383                return
2384            bytesfn.append(fn)
2385        add_filename(os_helper.TESTFN_UNICODE)
2386        if os_helper.TESTFN_UNENCODABLE:
2387            add_filename(os_helper.TESTFN_UNENCODABLE)
2388        if os_helper.TESTFN_NONASCII:
2389            add_filename(os_helper.TESTFN_NONASCII)
2390        if not bytesfn:
2391            self.skipTest("couldn't create any non-ascii filename")
2392
2393        self.unicodefn = set()
2394        os.mkdir(self.dir)
2395        try:
2396            for fn in bytesfn:
2397                os_helper.create_empty_file(os.path.join(self.bdir, fn))
2398                fn = os.fsdecode(fn)
2399                if fn in self.unicodefn:
2400                    raise ValueError("duplicate filename")
2401                self.unicodefn.add(fn)
2402        except:
2403            shutil.rmtree(self.dir)
2404            raise
2405
2406    def tearDown(self):
2407        shutil.rmtree(self.dir)
2408
2409    def test_listdir(self):
2410        expected = self.unicodefn
2411        found = set(os.listdir(self.dir))
2412        self.assertEqual(found, expected)
2413        # test listdir without arguments
2414        current_directory = os.getcwd()
2415        try:
2416            os.chdir(os.sep)
2417            self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2418        finally:
2419            os.chdir(current_directory)
2420
2421    def test_open(self):
2422        for fn in self.unicodefn:
2423            f = open(os.path.join(self.dir, fn), 'rb')
2424            f.close()
2425
2426    @unittest.skipUnless(hasattr(os, 'statvfs'),
2427                            "need os.statvfs()")
2428    def test_statvfs(self):
2429        # issue #9645
2430        for fn in self.unicodefn:
2431            # should not fail with file not found error
2432            fullname = os.path.join(self.dir, fn)
2433            os.statvfs(fullname)
2434
2435    def test_stat(self):
2436        for fn in self.unicodefn:
2437            os.stat(os.path.join(self.dir, fn))
2438
2439@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2440class Win32KillTests(unittest.TestCase):
2441    def _kill(self, sig):
2442        # Start sys.executable as a subprocess and communicate from the
2443        # subprocess to the parent that the interpreter is ready. When it
2444        # becomes ready, send *sig* via os.kill to the subprocess and check
2445        # that the return code is equal to *sig*.
2446        import ctypes
2447        from ctypes import wintypes
2448        import msvcrt
2449
2450        # Since we can't access the contents of the process' stdout until the
2451        # process has exited, use PeekNamedPipe to see what's inside stdout
2452        # without waiting. This is done so we can tell that the interpreter
2453        # is started and running at a point where it could handle a signal.
2454        PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2455        PeekNamedPipe.restype = wintypes.BOOL
2456        PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2457                                  ctypes.POINTER(ctypes.c_char), # stdout buf
2458                                  wintypes.DWORD, # Buffer size
2459                                  ctypes.POINTER(wintypes.DWORD), # bytes read
2460                                  ctypes.POINTER(wintypes.DWORD), # bytes avail
2461                                  ctypes.POINTER(wintypes.DWORD)) # bytes left
2462        msg = "running"
2463        proc = subprocess.Popen([sys.executable, "-c",
2464                                 "import sys;"
2465                                 "sys.stdout.write('{}');"
2466                                 "sys.stdout.flush();"
2467                                 "input()".format(msg)],
2468                                stdout=subprocess.PIPE,
2469                                stderr=subprocess.PIPE,
2470                                stdin=subprocess.PIPE)
2471        self.addCleanup(proc.stdout.close)
2472        self.addCleanup(proc.stderr.close)
2473        self.addCleanup(proc.stdin.close)
2474
2475        count, max = 0, 100
2476        while count < max and proc.poll() is None:
2477            # Create a string buffer to store the result of stdout from the pipe
2478            buf = ctypes.create_string_buffer(len(msg))
2479            # Obtain the text currently in proc.stdout
2480            # Bytes read/avail/left are left as NULL and unused
2481            rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2482                                 buf, ctypes.sizeof(buf), None, None, None)
2483            self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2484            if buf.value:
2485                self.assertEqual(msg, buf.value.decode())
2486                break
2487            time.sleep(0.1)
2488            count += 1
2489        else:
2490            self.fail("Did not receive communication from the subprocess")
2491
2492        os.kill(proc.pid, sig)
2493        self.assertEqual(proc.wait(), sig)
2494
2495    def test_kill_sigterm(self):
2496        # SIGTERM doesn't mean anything special, but make sure it works
2497        self._kill(signal.SIGTERM)
2498
2499    def test_kill_int(self):
2500        # os.kill on Windows can take an int which gets set as the exit code
2501        self._kill(100)
2502
2503    @unittest.skipIf(mmap is None, "requires mmap")
2504    def _kill_with_event(self, event, name):
2505        tagname = "test_os_%s" % uuid.uuid1()
2506        m = mmap.mmap(-1, 1, tagname)
2507        m[0] = 0
2508        # Run a script which has console control handling enabled.
2509        proc = subprocess.Popen([sys.executable,
2510                   os.path.join(os.path.dirname(__file__),
2511                                "win_console_handler.py"), tagname],
2512                   creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2513        # Let the interpreter startup before we send signals. See #3137.
2514        count, max = 0, 100
2515        while count < max and proc.poll() is None:
2516            if m[0] == 1:
2517                break
2518            time.sleep(0.1)
2519            count += 1
2520        else:
2521            # Forcefully kill the process if we weren't able to signal it.
2522            os.kill(proc.pid, signal.SIGINT)
2523            self.fail("Subprocess didn't finish initialization")
2524        os.kill(proc.pid, event)
2525        # proc.send_signal(event) could also be done here.
2526        # Allow time for the signal to be passed and the process to exit.
2527        time.sleep(0.5)
2528        if not proc.poll():
2529            # Forcefully kill the process if we weren't able to signal it.
2530            os.kill(proc.pid, signal.SIGINT)
2531            self.fail("subprocess did not stop on {}".format(name))
2532
2533    @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
2534    @support.requires_subprocess()
2535    def test_CTRL_C_EVENT(self):
2536        from ctypes import wintypes
2537        import ctypes
2538
2539        # Make a NULL value by creating a pointer with no argument.
2540        NULL = ctypes.POINTER(ctypes.c_int)()
2541        SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2542        SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2543                                          wintypes.BOOL)
2544        SetConsoleCtrlHandler.restype = wintypes.BOOL
2545
2546        # Calling this with NULL and FALSE causes the calling process to
2547        # handle Ctrl+C, rather than ignore it. This property is inherited
2548        # by subprocesses.
2549        SetConsoleCtrlHandler(NULL, 0)
2550
2551        self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2552
2553    @support.requires_subprocess()
2554    def test_CTRL_BREAK_EVENT(self):
2555        self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2556
2557
2558@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2559class Win32ListdirTests(unittest.TestCase):
2560    """Test listdir on Windows."""
2561
2562    def setUp(self):
2563        self.created_paths = []
2564        for i in range(2):
2565            dir_name = 'SUB%d' % i
2566            dir_path = os.path.join(os_helper.TESTFN, dir_name)
2567            file_name = 'FILE%d' % i
2568            file_path = os.path.join(os_helper.TESTFN, file_name)
2569            os.makedirs(dir_path)
2570            with open(file_path, 'w', encoding='utf-8') as f:
2571                f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2572            self.created_paths.extend([dir_name, file_name])
2573        self.created_paths.sort()
2574
2575    def tearDown(self):
2576        shutil.rmtree(os_helper.TESTFN)
2577
2578    def test_listdir_no_extended_path(self):
2579        """Test when the path is not an "extended" path."""
2580        # unicode
2581        self.assertEqual(
2582                sorted(os.listdir(os_helper.TESTFN)),
2583                self.created_paths)
2584
2585        # bytes
2586        self.assertEqual(
2587                sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
2588                [os.fsencode(path) for path in self.created_paths])
2589
2590    def test_listdir_extended_path(self):
2591        """Test when the path starts with '\\\\?\\'."""
2592        # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
2593        # unicode
2594        path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
2595        self.assertEqual(
2596                sorted(os.listdir(path)),
2597                self.created_paths)
2598
2599        # bytes
2600        path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
2601        self.assertEqual(
2602                sorted(os.listdir(path)),
2603                [os.fsencode(path) for path in self.created_paths])
2604
2605
2606@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2607class ReadlinkTests(unittest.TestCase):
2608    filelink = 'readlinktest'
2609    filelink_target = os.path.abspath(__file__)
2610    filelinkb = os.fsencode(filelink)
2611    filelinkb_target = os.fsencode(filelink_target)
2612
2613    def assertPathEqual(self, left, right):
2614        left = os.path.normcase(left)
2615        right = os.path.normcase(right)
2616        if sys.platform == 'win32':
2617            # Bad practice to blindly strip the prefix as it may be required to
2618            # correctly refer to the file, but we're only comparing paths here.
2619            has_prefix = lambda p: p.startswith(
2620                b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2621            if has_prefix(left):
2622                left = left[4:]
2623            if has_prefix(right):
2624                right = right[4:]
2625        self.assertEqual(left, right)
2626
2627    def setUp(self):
2628        self.assertTrue(os.path.exists(self.filelink_target))
2629        self.assertTrue(os.path.exists(self.filelinkb_target))
2630        self.assertFalse(os.path.exists(self.filelink))
2631        self.assertFalse(os.path.exists(self.filelinkb))
2632
2633    def test_not_symlink(self):
2634        filelink_target = FakePath(self.filelink_target)
2635        self.assertRaises(OSError, os.readlink, self.filelink_target)
2636        self.assertRaises(OSError, os.readlink, filelink_target)
2637
2638    def test_missing_link(self):
2639        self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2640        self.assertRaises(FileNotFoundError, os.readlink,
2641                          FakePath('missing-link'))
2642
2643    @os_helper.skip_unless_symlink
2644    def test_pathlike(self):
2645        os.symlink(self.filelink_target, self.filelink)
2646        self.addCleanup(os_helper.unlink, self.filelink)
2647        filelink = FakePath(self.filelink)
2648        self.assertPathEqual(os.readlink(filelink), self.filelink_target)
2649
2650    @os_helper.skip_unless_symlink
2651    def test_pathlike_bytes(self):
2652        os.symlink(self.filelinkb_target, self.filelinkb)
2653        self.addCleanup(os_helper.unlink, self.filelinkb)
2654        path = os.readlink(FakePath(self.filelinkb))
2655        self.assertPathEqual(path, self.filelinkb_target)
2656        self.assertIsInstance(path, bytes)
2657
2658    @os_helper.skip_unless_symlink
2659    def test_bytes(self):
2660        os.symlink(self.filelinkb_target, self.filelinkb)
2661        self.addCleanup(os_helper.unlink, self.filelinkb)
2662        path = os.readlink(self.filelinkb)
2663        self.assertPathEqual(path, self.filelinkb_target)
2664        self.assertIsInstance(path, bytes)
2665
2666
2667@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2668@os_helper.skip_unless_symlink
2669class Win32SymlinkTests(unittest.TestCase):
2670    filelink = 'filelinktest'
2671    filelink_target = os.path.abspath(__file__)
2672    dirlink = 'dirlinktest'
2673    dirlink_target = os.path.dirname(filelink_target)
2674    missing_link = 'missing link'
2675
2676    def setUp(self):
2677        assert os.path.exists(self.dirlink_target)
2678        assert os.path.exists(self.filelink_target)
2679        assert not os.path.exists(self.dirlink)
2680        assert not os.path.exists(self.filelink)
2681        assert not os.path.exists(self.missing_link)
2682
2683    def tearDown(self):
2684        if os.path.exists(self.filelink):
2685            os.remove(self.filelink)
2686        if os.path.exists(self.dirlink):
2687            os.rmdir(self.dirlink)
2688        if os.path.lexists(self.missing_link):
2689            os.remove(self.missing_link)
2690
2691    def test_directory_link(self):
2692        os.symlink(self.dirlink_target, self.dirlink)
2693        self.assertTrue(os.path.exists(self.dirlink))
2694        self.assertTrue(os.path.isdir(self.dirlink))
2695        self.assertTrue(os.path.islink(self.dirlink))
2696        self.check_stat(self.dirlink, self.dirlink_target)
2697
2698    def test_file_link(self):
2699        os.symlink(self.filelink_target, self.filelink)
2700        self.assertTrue(os.path.exists(self.filelink))
2701        self.assertTrue(os.path.isfile(self.filelink))
2702        self.assertTrue(os.path.islink(self.filelink))
2703        self.check_stat(self.filelink, self.filelink_target)
2704
2705    def _create_missing_dir_link(self):
2706        'Create a "directory" link to a non-existent target'
2707        linkname = self.missing_link
2708        if os.path.lexists(linkname):
2709            os.remove(linkname)
2710        target = r'c:\\target does not exist.29r3c740'
2711        assert not os.path.exists(target)
2712        target_is_dir = True
2713        os.symlink(target, linkname, target_is_dir)
2714
2715    def test_remove_directory_link_to_missing_target(self):
2716        self._create_missing_dir_link()
2717        # For compatibility with Unix, os.remove will check the
2718        #  directory status and call RemoveDirectory if the symlink
2719        #  was created with target_is_dir==True.
2720        os.remove(self.missing_link)
2721
2722    def test_isdir_on_directory_link_to_missing_target(self):
2723        self._create_missing_dir_link()
2724        self.assertFalse(os.path.isdir(self.missing_link))
2725
2726    def test_rmdir_on_directory_link_to_missing_target(self):
2727        self._create_missing_dir_link()
2728        os.rmdir(self.missing_link)
2729
2730    def check_stat(self, link, target):
2731        self.assertEqual(os.stat(link), os.stat(target))
2732        self.assertNotEqual(os.lstat(link), os.stat(link))
2733
2734        bytes_link = os.fsencode(link)
2735        self.assertEqual(os.stat(bytes_link), os.stat(target))
2736        self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
2737
2738    def test_12084(self):
2739        level1 = os.path.abspath(os_helper.TESTFN)
2740        level2 = os.path.join(level1, "level2")
2741        level3 = os.path.join(level2, "level3")
2742        self.addCleanup(os_helper.rmtree, level1)
2743
2744        os.mkdir(level1)
2745        os.mkdir(level2)
2746        os.mkdir(level3)
2747
2748        file1 = os.path.abspath(os.path.join(level1, "file1"))
2749        create_file(file1)
2750
2751        orig_dir = os.getcwd()
2752        try:
2753            os.chdir(level2)
2754            link = os.path.join(level2, "link")
2755            os.symlink(os.path.relpath(file1), "link")
2756            self.assertIn("link", os.listdir(os.getcwd()))
2757
2758            # Check os.stat calls from the same dir as the link
2759            self.assertEqual(os.stat(file1), os.stat("link"))
2760
2761            # Check os.stat calls from a dir below the link
2762            os.chdir(level1)
2763            self.assertEqual(os.stat(file1),
2764                             os.stat(os.path.relpath(link)))
2765
2766            # Check os.stat calls from a dir above the link
2767            os.chdir(level3)
2768            self.assertEqual(os.stat(file1),
2769                             os.stat(os.path.relpath(link)))
2770        finally:
2771            os.chdir(orig_dir)
2772
2773    @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2774                            and os.path.exists(r'C:\ProgramData'),
2775                            'Test directories not found')
2776    def test_29248(self):
2777        # os.symlink() calls CreateSymbolicLink, which creates
2778        # the reparse data buffer with the print name stored
2779        # first, so the offset is always 0. CreateSymbolicLink
2780        # stores the "PrintName" DOS path (e.g. "C:\") first,
2781        # with an offset of 0, followed by the "SubstituteName"
2782        # NT path (e.g. "\??\C:\"). The "All Users" link, on
2783        # the other hand, seems to have been created manually
2784        # with an inverted order.
2785        target = os.readlink(r'C:\Users\All Users')
2786        self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2787
2788    def test_buffer_overflow(self):
2789        # Older versions would have a buffer overflow when detecting
2790        # whether a link source was a directory. This test ensures we
2791        # no longer crash, but does not otherwise validate the behavior
2792        segment = 'X' * 27
2793        path = os.path.join(*[segment] * 10)
2794        test_cases = [
2795            # overflow with absolute src
2796            ('\\' + path, segment),
2797            # overflow dest with relative src
2798            (segment, path),
2799            # overflow when joining src
2800            (path[:180], path[:180]),
2801        ]
2802        for src, dest in test_cases:
2803            try:
2804                os.symlink(src, dest)
2805            except FileNotFoundError:
2806                pass
2807            else:
2808                try:
2809                    os.remove(dest)
2810                except OSError:
2811                    pass
2812            # Also test with bytes, since that is a separate code path.
2813            try:
2814                os.symlink(os.fsencode(src), os.fsencode(dest))
2815            except FileNotFoundError:
2816                pass
2817            else:
2818                try:
2819                    os.remove(dest)
2820                except OSError:
2821                    pass
2822
2823    def test_appexeclink(self):
2824        root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
2825        if not os.path.isdir(root):
2826            self.skipTest("test requires a WindowsApps directory")
2827
2828        aliases = [os.path.join(root, a)
2829                   for a in fnmatch.filter(os.listdir(root), '*.exe')]
2830
2831        for alias in aliases:
2832            if support.verbose:
2833                print()
2834                print("Testing with", alias)
2835            st = os.lstat(alias)
2836            self.assertEqual(st, os.stat(alias))
2837            self.assertFalse(stat.S_ISLNK(st.st_mode))
2838            self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2839            # testing the first one we see is sufficient
2840            break
2841        else:
2842            self.skipTest("test requires an app execution alias")
2843
2844@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2845class Win32JunctionTests(unittest.TestCase):
2846    junction = 'junctiontest'
2847    junction_target = os.path.dirname(os.path.abspath(__file__))
2848
2849    def setUp(self):
2850        assert os.path.exists(self.junction_target)
2851        assert not os.path.lexists(self.junction)
2852
2853    def tearDown(self):
2854        if os.path.lexists(self.junction):
2855            os.unlink(self.junction)
2856
2857    def test_create_junction(self):
2858        _winapi.CreateJunction(self.junction_target, self.junction)
2859        self.assertTrue(os.path.lexists(self.junction))
2860        self.assertTrue(os.path.exists(self.junction))
2861        self.assertTrue(os.path.isdir(self.junction))
2862        self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2863        self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
2864
2865        # bpo-37834: Junctions are not recognized as links.
2866        self.assertFalse(os.path.islink(self.junction))
2867        self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2868                         os.path.normcase(os.readlink(self.junction)))
2869
2870    def test_unlink_removes_junction(self):
2871        _winapi.CreateJunction(self.junction_target, self.junction)
2872        self.assertTrue(os.path.exists(self.junction))
2873        self.assertTrue(os.path.lexists(self.junction))
2874
2875        os.unlink(self.junction)
2876        self.assertFalse(os.path.exists(self.junction))
2877
2878@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2879class Win32NtTests(unittest.TestCase):
2880    def test_getfinalpathname_handles(self):
2881        nt = import_helper.import_module('nt')
2882        ctypes = import_helper.import_module('ctypes')
2883        import ctypes.wintypes
2884
2885        kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2886        kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2887
2888        kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2889        kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2890                                                 ctypes.wintypes.LPDWORD)
2891
2892        # This is a pseudo-handle that doesn't need to be closed
2893        hproc = kernel.GetCurrentProcess()
2894
2895        handle_count = ctypes.wintypes.DWORD()
2896        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2897        self.assertEqual(1, ok)
2898
2899        before_count = handle_count.value
2900
2901        # The first two test the error path, __file__ tests the success path
2902        filenames = [
2903            r'\\?\C:',
2904            r'\\?\NUL',
2905            r'\\?\CONIN',
2906            __file__,
2907        ]
2908
2909        for _ in range(10):
2910            for name in filenames:
2911                try:
2912                    nt._getfinalpathname(name)
2913                except Exception:
2914                    # Failure is expected
2915                    pass
2916                try:
2917                    os.stat(name)
2918                except Exception:
2919                    pass
2920
2921        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2922        self.assertEqual(1, ok)
2923
2924        handle_delta = handle_count.value - before_count
2925
2926        self.assertEqual(0, handle_delta)
2927
2928    @support.requires_subprocess()
2929    def test_stat_unlink_race(self):
2930        # bpo-46785: the implementation of os.stat() falls back to reading
2931        # the parent directory if CreateFileW() fails with a permission
2932        # error. If reading the parent directory fails because the file or
2933        # directory are subsequently unlinked, or because the volume or
2934        # share are no longer available, then the original permission error
2935        # should not be restored.
2936        filename =  os_helper.TESTFN
2937        self.addCleanup(os_helper.unlink, filename)
2938        deadline = time.time() + 5
2939        command = textwrap.dedent("""\
2940            import os
2941            import sys
2942            import time
2943
2944            filename = sys.argv[1]
2945            deadline = float(sys.argv[2])
2946
2947            while time.time() < deadline:
2948                try:
2949                    with open(filename, "w") as f:
2950                        pass
2951                except OSError:
2952                    pass
2953                try:
2954                    os.remove(filename)
2955                except OSError:
2956                    pass
2957            """)
2958
2959        with subprocess.Popen([sys.executable, '-c', command, filename, str(deadline)]) as proc:
2960            while time.time() < deadline:
2961                try:
2962                    os.stat(filename)
2963                except FileNotFoundError as e:
2964                    assert e.winerror == 2  # ERROR_FILE_NOT_FOUND
2965            try:
2966                proc.wait(1)
2967            except subprocess.TimeoutExpired:
2968                proc.terminate()
2969
2970
2971@os_helper.skip_unless_symlink
2972class NonLocalSymlinkTests(unittest.TestCase):
2973
2974    def setUp(self):
2975        r"""
2976        Create this structure:
2977
2978        base
2979         \___ some_dir
2980        """
2981        os.makedirs('base/some_dir')
2982
2983    def tearDown(self):
2984        shutil.rmtree('base')
2985
2986    def test_directory_link_nonlocal(self):
2987        """
2988        The symlink target should resolve relative to the link, not relative
2989        to the current directory.
2990
2991        Then, link base/some_link -> base/some_dir and ensure that some_link
2992        is resolved as a directory.
2993
2994        In issue13772, it was discovered that directory detection failed if
2995        the symlink target was not specified relative to the current
2996        directory, which was a defect in the implementation.
2997        """
2998        src = os.path.join('base', 'some_link')
2999        os.symlink('some_dir', src)
3000        assert os.path.isdir(src)
3001
3002
3003class FSEncodingTests(unittest.TestCase):
3004    def test_nop(self):
3005        self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
3006        self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
3007
3008    def test_identity(self):
3009        # assert fsdecode(fsencode(x)) == x
3010        for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
3011            try:
3012                bytesfn = os.fsencode(fn)
3013            except UnicodeEncodeError:
3014                continue
3015            self.assertEqual(os.fsdecode(bytesfn), fn)
3016
3017
3018
3019class DeviceEncodingTests(unittest.TestCase):
3020
3021    def test_bad_fd(self):
3022        # Return None when an fd doesn't actually exist.
3023        self.assertIsNone(os.device_encoding(123456))
3024
3025    @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
3026            (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
3027            'test requires a tty and either Windows or nl_langinfo(CODESET)')
3028    @unittest.skipIf(
3029        support.is_emscripten, "Cannot get encoding of stdin on Emscripten"
3030    )
3031    def test_device_encoding(self):
3032        encoding = os.device_encoding(0)
3033        self.assertIsNotNone(encoding)
3034        self.assertTrue(codecs.lookup(encoding))
3035
3036
3037@support.requires_subprocess()
3038class PidTests(unittest.TestCase):
3039    @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
3040    def test_getppid(self):
3041        p = subprocess.Popen([sys.executable, '-c',
3042                              'import os; print(os.getppid())'],
3043                             stdout=subprocess.PIPE)
3044        stdout, _ = p.communicate()
3045        # We are the parent of our subprocess
3046        self.assertEqual(int(stdout), os.getpid())
3047
3048    def check_waitpid(self, code, exitcode, callback=None):
3049        if sys.platform == 'win32':
3050            # On Windows, os.spawnv() simply joins arguments with spaces:
3051            # arguments need to be quoted
3052            args = [f'"{sys.executable}"', '-c', f'"{code}"']
3053        else:
3054            args = [sys.executable, '-c', code]
3055        pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
3056
3057        if callback is not None:
3058            callback(pid)
3059
3060        # don't use support.wait_process() to test directly os.waitpid()
3061        # and os.waitstatus_to_exitcode()
3062        pid2, status = os.waitpid(pid, 0)
3063        self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
3064        self.assertEqual(pid2, pid)
3065
3066    def test_waitpid(self):
3067        self.check_waitpid(code='pass', exitcode=0)
3068
3069    def test_waitstatus_to_exitcode(self):
3070        exitcode = 23
3071        code = f'import sys; sys.exit({exitcode})'
3072        self.check_waitpid(code, exitcode=exitcode)
3073
3074        with self.assertRaises(TypeError):
3075            os.waitstatus_to_exitcode(0.0)
3076
3077    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3078    def test_waitpid_windows(self):
3079        # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
3080        # with exit code larger than INT_MAX.
3081        STATUS_CONTROL_C_EXIT = 0xC000013A
3082        code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
3083        self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
3084
3085    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3086    def test_waitstatus_to_exitcode_windows(self):
3087        max_exitcode = 2 ** 32 - 1
3088        for exitcode in (0, 1, 5, max_exitcode):
3089            self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
3090                             exitcode)
3091
3092        # invalid values
3093        with self.assertRaises(ValueError):
3094            os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
3095        with self.assertRaises(OverflowError):
3096            os.waitstatus_to_exitcode(-1)
3097
3098    # Skip the test on Windows
3099    @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
3100    def test_waitstatus_to_exitcode_kill(self):
3101        code = f'import time; time.sleep({support.LONG_TIMEOUT})'
3102        signum = signal.SIGKILL
3103
3104        def kill_process(pid):
3105            os.kill(pid, signum)
3106
3107        self.check_waitpid(code, exitcode=-signum, callback=kill_process)
3108
3109
3110@support.requires_subprocess()
3111class SpawnTests(unittest.TestCase):
3112    @staticmethod
3113    def quote_args(args):
3114        # On Windows, os.spawn* simply joins arguments with spaces:
3115        # arguments need to be quoted
3116        if os.name != 'nt':
3117            return args
3118        return [f'"{arg}"' if " " in arg.strip() else arg for arg in args]
3119
3120    def create_args(self, *, with_env=False, use_bytes=False):
3121        self.exitcode = 17
3122
3123        filename = os_helper.TESTFN
3124        self.addCleanup(os_helper.unlink, filename)
3125
3126        if not with_env:
3127            code = 'import sys; sys.exit(%s)' % self.exitcode
3128        else:
3129            self.env = dict(os.environ)
3130            # create an unique key
3131            self.key = str(uuid.uuid4())
3132            self.env[self.key] = self.key
3133            # read the variable from os.environ to check that it exists
3134            code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
3135                    % (self.key, self.exitcode))
3136
3137        with open(filename, "w", encoding="utf-8") as fp:
3138            fp.write(code)
3139
3140        program = sys.executable
3141        args = self.quote_args([program, filename])
3142        if use_bytes:
3143            program = os.fsencode(program)
3144            args = [os.fsencode(a) for a in args]
3145            self.env = {os.fsencode(k): os.fsencode(v)
3146                        for k, v in self.env.items()}
3147
3148        return program, args
3149
3150    @requires_os_func('spawnl')
3151    def test_spawnl(self):
3152        program, args = self.create_args()
3153        exitcode = os.spawnl(os.P_WAIT, program, *args)
3154        self.assertEqual(exitcode, self.exitcode)
3155
3156    @requires_os_func('spawnle')
3157    def test_spawnle(self):
3158        program, args = self.create_args(with_env=True)
3159        exitcode = os.spawnle(os.P_WAIT, program, *args, self.env)
3160        self.assertEqual(exitcode, self.exitcode)
3161
3162    @requires_os_func('spawnlp')
3163    def test_spawnlp(self):
3164        program, args = self.create_args()
3165        exitcode = os.spawnlp(os.P_WAIT, program, *args)
3166        self.assertEqual(exitcode, self.exitcode)
3167
3168    @requires_os_func('spawnlpe')
3169    def test_spawnlpe(self):
3170        program, args = self.create_args(with_env=True)
3171        exitcode = os.spawnlpe(os.P_WAIT, program, *args, self.env)
3172        self.assertEqual(exitcode, self.exitcode)
3173
3174    @requires_os_func('spawnv')
3175    def test_spawnv(self):
3176        program, args = self.create_args()
3177        exitcode = os.spawnv(os.P_WAIT, program, args)
3178        self.assertEqual(exitcode, self.exitcode)
3179
3180        # Test for PyUnicode_FSConverter()
3181        exitcode = os.spawnv(os.P_WAIT, FakePath(program), args)
3182        self.assertEqual(exitcode, self.exitcode)
3183
3184    @requires_os_func('spawnve')
3185    def test_spawnve(self):
3186        program, args = self.create_args(with_env=True)
3187        exitcode = os.spawnve(os.P_WAIT, program, args, self.env)
3188        self.assertEqual(exitcode, self.exitcode)
3189
3190    @requires_os_func('spawnvp')
3191    def test_spawnvp(self):
3192        program, args = self.create_args()
3193        exitcode = os.spawnvp(os.P_WAIT, program, args)
3194        self.assertEqual(exitcode, self.exitcode)
3195
3196    @requires_os_func('spawnvpe')
3197    def test_spawnvpe(self):
3198        program, args = self.create_args(with_env=True)
3199        exitcode = os.spawnvpe(os.P_WAIT, program, args, self.env)
3200        self.assertEqual(exitcode, self.exitcode)
3201
3202    @requires_os_func('spawnv')
3203    def test_nowait(self):
3204        program, args = self.create_args()
3205        pid = os.spawnv(os.P_NOWAIT, program, args)
3206        support.wait_process(pid, exitcode=self.exitcode)
3207
3208    @requires_os_func('spawnve')
3209    def test_spawnve_bytes(self):
3210        # Test bytes handling in parse_arglist and parse_envlist (#28114)
3211        program, args = self.create_args(with_env=True, use_bytes=True)
3212        exitcode = os.spawnve(os.P_WAIT, program, args, self.env)
3213        self.assertEqual(exitcode, self.exitcode)
3214
3215    @requires_os_func('spawnl')
3216    def test_spawnl_noargs(self):
3217        program, __ = self.create_args()
3218        self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, program)
3219        self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, program, '')
3220
3221    @requires_os_func('spawnle')
3222    def test_spawnle_noargs(self):
3223        program, __ = self.create_args()
3224        self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, program, {})
3225        self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, program, '', {})
3226
3227    @requires_os_func('spawnv')
3228    def test_spawnv_noargs(self):
3229        program, __ = self.create_args()
3230        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, ())
3231        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, [])
3232        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, ('',))
3233        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, [''])
3234
3235    @requires_os_func('spawnve')
3236    def test_spawnve_noargs(self):
3237        program, __ = self.create_args()
3238        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, (), {})
3239        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, [], {})
3240        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, ('',), {})
3241        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, program, [''], {})
3242
3243    def _test_invalid_env(self, spawn):
3244        program = sys.executable
3245        args = self.quote_args([program, '-c', 'pass'])
3246
3247        # null character in the environment variable name
3248        newenv = os.environ.copy()
3249        newenv["FRUIT\0VEGETABLE"] = "cabbage"
3250        try:
3251            exitcode = spawn(os.P_WAIT, program, args, newenv)
3252        except ValueError:
3253            pass
3254        else:
3255            self.assertEqual(exitcode, 127)
3256
3257        # null character in the environment variable value
3258        newenv = os.environ.copy()
3259        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
3260        try:
3261            exitcode = spawn(os.P_WAIT, program, args, newenv)
3262        except ValueError:
3263            pass
3264        else:
3265            self.assertEqual(exitcode, 127)
3266
3267        # equal character in the environment variable name
3268        newenv = os.environ.copy()
3269        newenv["FRUIT=ORANGE"] = "lemon"
3270        try:
3271            exitcode = spawn(os.P_WAIT, program, args, newenv)
3272        except ValueError:
3273            pass
3274        else:
3275            self.assertEqual(exitcode, 127)
3276
3277        # equal character in the environment variable value
3278        filename = os_helper.TESTFN
3279        self.addCleanup(os_helper.unlink, filename)
3280        with open(filename, "w", encoding="utf-8") as fp:
3281            fp.write('import sys, os\n'
3282                     'if os.getenv("FRUIT") != "orange=lemon":\n'
3283                     '    raise AssertionError')
3284
3285        args = self.quote_args([program, filename])
3286        newenv = os.environ.copy()
3287        newenv["FRUIT"] = "orange=lemon"
3288        exitcode = spawn(os.P_WAIT, program, args, newenv)
3289        self.assertEqual(exitcode, 0)
3290
3291    @requires_os_func('spawnve')
3292    def test_spawnve_invalid_env(self):
3293        self._test_invalid_env(os.spawnve)
3294
3295    @requires_os_func('spawnvpe')
3296    def test_spawnvpe_invalid_env(self):
3297        self._test_invalid_env(os.spawnvpe)
3298
3299
3300# The introduction of this TestCase caused at least two different errors on
3301# *nix buildbots. Temporarily skip this to let the buildbots move along.
3302@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
3303@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3304class LoginTests(unittest.TestCase):
3305    def test_getlogin(self):
3306        user_name = os.getlogin()
3307        self.assertNotEqual(len(user_name), 0)
3308
3309
3310@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3311                     "needs os.getpriority and os.setpriority")
3312class ProgramPriorityTests(unittest.TestCase):
3313    """Tests for os.getpriority() and os.setpriority()."""
3314
3315    def test_set_get_priority(self):
3316
3317        base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3318        os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3319        try:
3320            new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3321            if base >= 19 and new_prio <= 19:
3322                raise unittest.SkipTest("unable to reliably test setpriority "
3323                                        "at current nice level of %s" % base)
3324            else:
3325                self.assertEqual(new_prio, base + 1)
3326        finally:
3327            try:
3328                os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3329            except OSError as err:
3330                if err.errno != errno.EACCES:
3331                    raise
3332
3333
3334@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3335class TestSendfile(unittest.IsolatedAsyncioTestCase):
3336
3337    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
3338    SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
3339                               not sys.platform.startswith("solaris") and \
3340                               not sys.platform.startswith("sunos")
3341    requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3342            'requires headers and trailers support')
3343    requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3344            'test is only meaningful on 32-bit builds')
3345
3346    @classmethod
3347    def setUpClass(cls):
3348        create_file(os_helper.TESTFN, cls.DATA)
3349
3350    @classmethod
3351    def tearDownClass(cls):
3352        os_helper.unlink(os_helper.TESTFN)
3353
3354    @staticmethod
3355    async def chunks(reader):
3356        while not reader.at_eof():
3357            yield await reader.read()
3358
3359    async def handle_new_client(self, reader, writer):
3360        self.server_buffer = b''.join([x async for x in self.chunks(reader)])
3361        writer.close()
3362        self.server.close()  # The test server processes a single client only
3363
3364    async def asyncSetUp(self):
3365        self.server_buffer = b''
3366        self.server = await asyncio.start_server(self.handle_new_client,
3367                                                 socket_helper.HOSTv4)
3368        server_name = self.server.sockets[0].getsockname()
3369        self.client = socket.socket()
3370        self.client.setblocking(False)
3371        await asyncio.get_running_loop().sock_connect(self.client, server_name)
3372        self.sockno = self.client.fileno()
3373        self.file = open(os_helper.TESTFN, 'rb')
3374        self.fileno = self.file.fileno()
3375
3376    async def asyncTearDown(self):
3377        self.file.close()
3378        self.client.close()
3379        await self.server.wait_closed()
3380
3381    # Use the test subject instead of asyncio.loop.sendfile
3382    @staticmethod
3383    async def async_sendfile(*args, **kwargs):
3384        return await asyncio.to_thread(os.sendfile, *args, **kwargs)
3385
3386    @staticmethod
3387    async def sendfile_wrapper(*args, **kwargs):
3388        """A higher level wrapper representing how an application is
3389        supposed to use sendfile().
3390        """
3391        while True:
3392            try:
3393                return await TestSendfile.async_sendfile(*args, **kwargs)
3394            except OSError as err:
3395                if err.errno == errno.ECONNRESET:
3396                    # disconnected
3397                    raise
3398                elif err.errno in (errno.EAGAIN, errno.EBUSY):
3399                    # we have to retry send data
3400                    continue
3401                else:
3402                    raise
3403
3404    async def test_send_whole_file(self):
3405        # normal send
3406        total_sent = 0
3407        offset = 0
3408        nbytes = 4096
3409        while total_sent < len(self.DATA):
3410            sent = await self.sendfile_wrapper(self.sockno, self.fileno,
3411                                               offset, nbytes)
3412            if sent == 0:
3413                break
3414            offset += sent
3415            total_sent += sent
3416            self.assertTrue(sent <= nbytes)
3417            self.assertEqual(offset, total_sent)
3418
3419        self.assertEqual(total_sent, len(self.DATA))
3420        self.client.shutdown(socket.SHUT_RDWR)
3421        self.client.close()
3422        await self.server.wait_closed()
3423        self.assertEqual(len(self.server_buffer), len(self.DATA))
3424        self.assertEqual(self.server_buffer, self.DATA)
3425
3426    async def test_send_at_certain_offset(self):
3427        # start sending a file at a certain offset
3428        total_sent = 0
3429        offset = len(self.DATA) // 2
3430        must_send = len(self.DATA) - offset
3431        nbytes = 4096
3432        while total_sent < must_send:
3433            sent = await self.sendfile_wrapper(self.sockno, self.fileno,
3434                                               offset, nbytes)
3435            if sent == 0:
3436                break
3437            offset += sent
3438            total_sent += sent
3439            self.assertTrue(sent <= nbytes)
3440
3441        self.client.shutdown(socket.SHUT_RDWR)
3442        self.client.close()
3443        await self.server.wait_closed()
3444        expected = self.DATA[len(self.DATA) // 2:]
3445        self.assertEqual(total_sent, len(expected))
3446        self.assertEqual(len(self.server_buffer), len(expected))
3447        self.assertEqual(self.server_buffer, expected)
3448
3449    async def test_offset_overflow(self):
3450        # specify an offset > file size
3451        offset = len(self.DATA) + 4096
3452        try:
3453            sent = await self.async_sendfile(self.sockno, self.fileno,
3454                                             offset, 4096)
3455        except OSError as e:
3456            # Solaris can raise EINVAL if offset >= file length, ignore.
3457            if e.errno != errno.EINVAL:
3458                raise
3459        else:
3460            self.assertEqual(sent, 0)
3461        self.client.shutdown(socket.SHUT_RDWR)
3462        self.client.close()
3463        await self.server.wait_closed()
3464        self.assertEqual(self.server_buffer, b'')
3465
3466    async def test_invalid_offset(self):
3467        with self.assertRaises(OSError) as cm:
3468            await self.async_sendfile(self.sockno, self.fileno, -1, 4096)
3469        self.assertEqual(cm.exception.errno, errno.EINVAL)
3470
3471    async def test_keywords(self):
3472        # Keyword arguments should be supported
3473        await self.async_sendfile(out_fd=self.sockno, in_fd=self.fileno,
3474                                  offset=0, count=4096)
3475        if self.SUPPORT_HEADERS_TRAILERS:
3476            await self.async_sendfile(out_fd=self.sockno, in_fd=self.fileno,
3477                                      offset=0, count=4096,
3478                                      headers=(), trailers=(), flags=0)
3479
3480    # --- headers / trailers tests
3481
3482    @requires_headers_trailers
3483    async def test_headers(self):
3484        total_sent = 0
3485        expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
3486        sent = await self.async_sendfile(self.sockno, self.fileno, 0, 4096,
3487                                         headers=[b"x" * 512, b"y" * 256])
3488        self.assertLessEqual(sent, 512 + 256 + 4096)
3489        total_sent += sent
3490        offset = 4096
3491        while total_sent < len(expected_data):
3492            nbytes = min(len(expected_data) - total_sent, 4096)
3493            sent = await self.sendfile_wrapper(self.sockno, self.fileno,
3494                                               offset, nbytes)
3495            if sent == 0:
3496                break
3497            self.assertLessEqual(sent, nbytes)
3498            total_sent += sent
3499            offset += sent
3500
3501        self.assertEqual(total_sent, len(expected_data))
3502        self.client.close()
3503        await self.server.wait_closed()
3504        self.assertEqual(hash(self.server_buffer), hash(expected_data))
3505
3506    @requires_headers_trailers
3507    async def test_trailers(self):
3508        TESTFN2 = os_helper.TESTFN + "2"
3509        file_data = b"abcdef"
3510
3511        self.addCleanup(os_helper.unlink, TESTFN2)
3512        create_file(TESTFN2, file_data)
3513
3514        with open(TESTFN2, 'rb') as f:
3515            await self.async_sendfile(self.sockno, f.fileno(), 0, 5,
3516                                      trailers=[b"123456", b"789"])
3517            self.client.close()
3518            await self.server.wait_closed()
3519            self.assertEqual(self.server_buffer, b"abcde123456789")
3520
3521    @requires_headers_trailers
3522    @requires_32b
3523    async def test_headers_overflow_32bits(self):
3524        self.server.handler_instance.accumulate = False
3525        with self.assertRaises(OSError) as cm:
3526            await self.async_sendfile(self.sockno, self.fileno, 0, 0,
3527                                      headers=[b"x" * 2**16] * 2**15)
3528        self.assertEqual(cm.exception.errno, errno.EINVAL)
3529
3530    @requires_headers_trailers
3531    @requires_32b
3532    async def test_trailers_overflow_32bits(self):
3533        self.server.handler_instance.accumulate = False
3534        with self.assertRaises(OSError) as cm:
3535            await self.async_sendfile(self.sockno, self.fileno, 0, 0,
3536                                      trailers=[b"x" * 2**16] * 2**15)
3537        self.assertEqual(cm.exception.errno, errno.EINVAL)
3538
3539    @requires_headers_trailers
3540    @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3541                         'test needs os.SF_NODISKIO')
3542    async def test_flags(self):
3543        try:
3544            await self.async_sendfile(self.sockno, self.fileno, 0, 4096,
3545                                      flags=os.SF_NODISKIO)
3546        except OSError as err:
3547            if err.errno not in (errno.EBUSY, errno.EAGAIN):
3548                raise
3549
3550
3551def supports_extended_attributes():
3552    if not hasattr(os, "setxattr"):
3553        return False
3554
3555    try:
3556        with open(os_helper.TESTFN, "xb", 0) as fp:
3557            try:
3558                os.setxattr(fp.fileno(), b"user.test", b"")
3559            except OSError:
3560                return False
3561    finally:
3562        os_helper.unlink(os_helper.TESTFN)
3563
3564    return True
3565
3566
3567@unittest.skipUnless(supports_extended_attributes(),
3568                     "no non-broken extended attribute support")
3569# Kernels < 2.6.39 don't respect setxattr flags.
3570@support.requires_linux_version(2, 6, 39)
3571class ExtendedAttributeTests(unittest.TestCase):
3572
3573    def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
3574        fn = os_helper.TESTFN
3575        self.addCleanup(os_helper.unlink, fn)
3576        create_file(fn)
3577
3578        with self.assertRaises(OSError) as cm:
3579            getxattr(fn, s("user.test"), **kwargs)
3580        self.assertEqual(cm.exception.errno, errno.ENODATA)
3581
3582        init_xattr = listxattr(fn)
3583        self.assertIsInstance(init_xattr, list)
3584
3585        setxattr(fn, s("user.test"), b"", **kwargs)
3586        xattr = set(init_xattr)
3587        xattr.add("user.test")
3588        self.assertEqual(set(listxattr(fn)), xattr)
3589        self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3590        setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3591        self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
3592
3593        with self.assertRaises(OSError) as cm:
3594            setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
3595        self.assertEqual(cm.exception.errno, errno.EEXIST)
3596
3597        with self.assertRaises(OSError) as cm:
3598            setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
3599        self.assertEqual(cm.exception.errno, errno.ENODATA)
3600
3601        setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
3602        xattr.add("user.test2")
3603        self.assertEqual(set(listxattr(fn)), xattr)
3604        removexattr(fn, s("user.test"), **kwargs)
3605
3606        with self.assertRaises(OSError) as cm:
3607            getxattr(fn, s("user.test"), **kwargs)
3608        self.assertEqual(cm.exception.errno, errno.ENODATA)
3609
3610        xattr.remove("user.test")
3611        self.assertEqual(set(listxattr(fn)), xattr)
3612        self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3613        setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3614        self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3615        removexattr(fn, s("user.test"), **kwargs)
3616        many = sorted("user.test{}".format(i) for i in range(100))
3617        for thing in many:
3618            setxattr(fn, thing, b"x", **kwargs)
3619        self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
3620
3621    def _check_xattrs(self, *args, **kwargs):
3622        self._check_xattrs_str(str, *args, **kwargs)
3623        os_helper.unlink(os_helper.TESTFN)
3624
3625        self._check_xattrs_str(os.fsencode, *args, **kwargs)
3626        os_helper.unlink(os_helper.TESTFN)
3627
3628    def test_simple(self):
3629        self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3630                           os.listxattr)
3631
3632    def test_lpath(self):
3633        self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3634                           os.listxattr, follow_symlinks=False)
3635
3636    def test_fds(self):
3637        def getxattr(path, *args):
3638            with open(path, "rb") as fp:
3639                return os.getxattr(fp.fileno(), *args)
3640        def setxattr(path, *args):
3641            with open(path, "wb", 0) as fp:
3642                os.setxattr(fp.fileno(), *args)
3643        def removexattr(path, *args):
3644            with open(path, "wb", 0) as fp:
3645                os.removexattr(fp.fileno(), *args)
3646        def listxattr(path, *args):
3647            with open(path, "rb") as fp:
3648                return os.listxattr(fp.fileno(), *args)
3649        self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3650
3651
3652@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3653class TermsizeTests(unittest.TestCase):
3654    def test_does_not_crash(self):
3655        """Check if get_terminal_size() returns a meaningful value.
3656
3657        There's no easy portable way to actually check the size of the
3658        terminal, so let's check if it returns something sensible instead.
3659        """
3660        try:
3661            size = os.get_terminal_size()
3662        except OSError as e:
3663            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3664                # Under win32 a generic OSError can be thrown if the
3665                # handle cannot be retrieved
3666                self.skipTest("failed to query terminal size")
3667            raise
3668
3669        self.assertGreaterEqual(size.columns, 0)
3670        self.assertGreaterEqual(size.lines, 0)
3671
3672    def test_stty_match(self):
3673        """Check if stty returns the same results
3674
3675        stty actually tests stdin, so get_terminal_size is invoked on
3676        stdin explicitly. If stty succeeded, then get_terminal_size()
3677        should work too.
3678        """
3679        try:
3680            size = (
3681                subprocess.check_output(
3682                    ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3683                ).split()
3684            )
3685        except (FileNotFoundError, subprocess.CalledProcessError,
3686                PermissionError):
3687            self.skipTest("stty invocation failed")
3688        expected = (int(size[1]), int(size[0])) # reversed order
3689
3690        try:
3691            actual = os.get_terminal_size(sys.__stdin__.fileno())
3692        except OSError as e:
3693            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3694                # Under win32 a generic OSError can be thrown if the
3695                # handle cannot be retrieved
3696                self.skipTest("failed to query terminal size")
3697            raise
3698        self.assertEqual(expected, actual)
3699
3700
3701@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
3702@support.requires_linux_version(3, 17)
3703class MemfdCreateTests(unittest.TestCase):
3704    def test_memfd_create(self):
3705        fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3706        self.assertNotEqual(fd, -1)
3707        self.addCleanup(os.close, fd)
3708        self.assertFalse(os.get_inheritable(fd))
3709        with open(fd, "wb", closefd=False) as f:
3710            f.write(b'memfd_create')
3711            self.assertEqual(f.tell(), 12)
3712
3713        fd2 = os.memfd_create("Hi")
3714        self.addCleanup(os.close, fd2)
3715        self.assertFalse(os.get_inheritable(fd2))
3716
3717
3718@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd')
3719@support.requires_linux_version(2, 6, 30)
3720class EventfdTests(unittest.TestCase):
3721    def test_eventfd_initval(self):
3722        def pack(value):
3723            """Pack as native uint64_t
3724            """
3725            return struct.pack("@Q", value)
3726        size = 8  # read/write 8 bytes
3727        initval = 42
3728        fd = os.eventfd(initval)
3729        self.assertNotEqual(fd, -1)
3730        self.addCleanup(os.close, fd)
3731        self.assertFalse(os.get_inheritable(fd))
3732
3733        # test with raw read/write
3734        res = os.read(fd, size)
3735        self.assertEqual(res, pack(initval))
3736
3737        os.write(fd, pack(23))
3738        res = os.read(fd, size)
3739        self.assertEqual(res, pack(23))
3740
3741        os.write(fd, pack(40))
3742        os.write(fd, pack(2))
3743        res = os.read(fd, size)
3744        self.assertEqual(res, pack(42))
3745
3746        # test with eventfd_read/eventfd_write
3747        os.eventfd_write(fd, 20)
3748        os.eventfd_write(fd, 3)
3749        res = os.eventfd_read(fd)
3750        self.assertEqual(res, 23)
3751
3752    def test_eventfd_semaphore(self):
3753        initval = 2
3754        flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK
3755        fd = os.eventfd(initval, flags)
3756        self.assertNotEqual(fd, -1)
3757        self.addCleanup(os.close, fd)
3758
3759        # semaphore starts has initval 2, two reads return '1'
3760        res = os.eventfd_read(fd)
3761        self.assertEqual(res, 1)
3762        res = os.eventfd_read(fd)
3763        self.assertEqual(res, 1)
3764        # third read would block
3765        with self.assertRaises(BlockingIOError):
3766            os.eventfd_read(fd)
3767        with self.assertRaises(BlockingIOError):
3768            os.read(fd, 8)
3769
3770        # increase semaphore counter, read one
3771        os.eventfd_write(fd, 1)
3772        res = os.eventfd_read(fd)
3773        self.assertEqual(res, 1)
3774        # next read would block, too
3775        with self.assertRaises(BlockingIOError):
3776            os.eventfd_read(fd)
3777
3778    def test_eventfd_select(self):
3779        flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK
3780        fd = os.eventfd(0, flags)
3781        self.assertNotEqual(fd, -1)
3782        self.addCleanup(os.close, fd)
3783
3784        # counter is zero, only writeable
3785        rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3786        self.assertEqual((rfd, wfd, xfd), ([], [fd], []))
3787
3788        # counter is non-zero, read and writeable
3789        os.eventfd_write(fd, 23)
3790        rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3791        self.assertEqual((rfd, wfd, xfd), ([fd], [fd], []))
3792        self.assertEqual(os.eventfd_read(fd), 23)
3793
3794        # counter at max, only readable
3795        os.eventfd_write(fd, (2**64) - 2)
3796        rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3797        self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
3798        os.eventfd_read(fd)
3799
3800
3801class OSErrorTests(unittest.TestCase):
3802    def setUp(self):
3803        class Str(str):
3804            pass
3805
3806        self.bytes_filenames = []
3807        self.unicode_filenames = []
3808        if os_helper.TESTFN_UNENCODABLE is not None:
3809            decoded = os_helper.TESTFN_UNENCODABLE
3810        else:
3811            decoded = os_helper.TESTFN
3812        self.unicode_filenames.append(decoded)
3813        self.unicode_filenames.append(Str(decoded))
3814        if os_helper.TESTFN_UNDECODABLE is not None:
3815            encoded = os_helper.TESTFN_UNDECODABLE
3816        else:
3817            encoded = os.fsencode(os_helper.TESTFN)
3818        self.bytes_filenames.append(encoded)
3819        self.bytes_filenames.append(bytearray(encoded))
3820        self.bytes_filenames.append(memoryview(encoded))
3821
3822        self.filenames = self.bytes_filenames + self.unicode_filenames
3823
3824    def test_oserror_filename(self):
3825        funcs = [
3826            (self.filenames, os.chdir,),
3827            (self.filenames, os.lstat,),
3828            (self.filenames, os.open, os.O_RDONLY),
3829            (self.filenames, os.rmdir,),
3830            (self.filenames, os.stat,),
3831            (self.filenames, os.unlink,),
3832        ]
3833        if sys.platform == "win32":
3834            funcs.extend((
3835                (self.bytes_filenames, os.rename, b"dst"),
3836                (self.bytes_filenames, os.replace, b"dst"),
3837                (self.unicode_filenames, os.rename, "dst"),
3838                (self.unicode_filenames, os.replace, "dst"),
3839                (self.unicode_filenames, os.listdir, ),
3840            ))
3841        else:
3842            funcs.extend((
3843                (self.filenames, os.listdir,),
3844                (self.filenames, os.rename, "dst"),
3845                (self.filenames, os.replace, "dst"),
3846            ))
3847        if os_helper.can_chmod():
3848            funcs.append((self.filenames, os.chmod, 0o777))
3849        if hasattr(os, "chown"):
3850            funcs.append((self.filenames, os.chown, 0, 0))
3851        if hasattr(os, "lchown"):
3852            funcs.append((self.filenames, os.lchown, 0, 0))
3853        if hasattr(os, "truncate"):
3854            funcs.append((self.filenames, os.truncate, 0))
3855        if hasattr(os, "chflags"):
3856            funcs.append((self.filenames, os.chflags, 0))
3857        if hasattr(os, "lchflags"):
3858            funcs.append((self.filenames, os.lchflags, 0))
3859        if hasattr(os, "chroot"):
3860            funcs.append((self.filenames, os.chroot,))
3861        if hasattr(os, "link"):
3862            if sys.platform == "win32":
3863                funcs.append((self.bytes_filenames, os.link, b"dst"))
3864                funcs.append((self.unicode_filenames, os.link, "dst"))
3865            else:
3866                funcs.append((self.filenames, os.link, "dst"))
3867        if hasattr(os, "listxattr"):
3868            funcs.extend((
3869                (self.filenames, os.listxattr,),
3870                (self.filenames, os.getxattr, "user.test"),
3871                (self.filenames, os.setxattr, "user.test", b'user'),
3872                (self.filenames, os.removexattr, "user.test"),
3873            ))
3874        if hasattr(os, "lchmod"):
3875            funcs.append((self.filenames, os.lchmod, 0o777))
3876        if hasattr(os, "readlink"):
3877            funcs.append((self.filenames, os.readlink,))
3878
3879
3880        for filenames, func, *func_args in funcs:
3881            for name in filenames:
3882                try:
3883                    if isinstance(name, (str, bytes)):
3884                        func(name, *func_args)
3885                    else:
3886                        with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3887                            func(name, *func_args)
3888                except OSError as err:
3889                    self.assertIs(err.filename, name, str(func))
3890                except UnicodeDecodeError:
3891                    pass
3892                else:
3893                    self.fail("No exception thrown by {}".format(func))
3894
3895class CPUCountTests(unittest.TestCase):
3896    def test_cpu_count(self):
3897        cpus = os.cpu_count()
3898        if cpus is not None:
3899            self.assertIsInstance(cpus, int)
3900            self.assertGreater(cpus, 0)
3901        else:
3902            self.skipTest("Could not determine the number of CPUs")
3903
3904
3905# FD inheritance check is only useful for systems with process support.
3906@support.requires_subprocess()
3907class FDInheritanceTests(unittest.TestCase):
3908    def test_get_set_inheritable(self):
3909        fd = os.open(__file__, os.O_RDONLY)
3910        self.addCleanup(os.close, fd)
3911        self.assertEqual(os.get_inheritable(fd), False)
3912
3913        os.set_inheritable(fd, True)
3914        self.assertEqual(os.get_inheritable(fd), True)
3915
3916    @unittest.skipIf(fcntl is None, "need fcntl")
3917    def test_get_inheritable_cloexec(self):
3918        fd = os.open(__file__, os.O_RDONLY)
3919        self.addCleanup(os.close, fd)
3920        self.assertEqual(os.get_inheritable(fd), False)
3921
3922        # clear FD_CLOEXEC flag
3923        flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3924        flags &= ~fcntl.FD_CLOEXEC
3925        fcntl.fcntl(fd, fcntl.F_SETFD, flags)
3926
3927        self.assertEqual(os.get_inheritable(fd), True)
3928
3929    @unittest.skipIf(fcntl is None, "need fcntl")
3930    def test_set_inheritable_cloexec(self):
3931        fd = os.open(__file__, os.O_RDONLY)
3932        self.addCleanup(os.close, fd)
3933        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3934                         fcntl.FD_CLOEXEC)
3935
3936        os.set_inheritable(fd, True)
3937        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3938                         0)
3939
3940    @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH")
3941    def test_get_set_inheritable_o_path(self):
3942        fd = os.open(__file__, os.O_PATH)
3943        self.addCleanup(os.close, fd)
3944        self.assertEqual(os.get_inheritable(fd), False)
3945
3946        os.set_inheritable(fd, True)
3947        self.assertEqual(os.get_inheritable(fd), True)
3948
3949        os.set_inheritable(fd, False)
3950        self.assertEqual(os.get_inheritable(fd), False)
3951
3952    def test_get_set_inheritable_badf(self):
3953        fd = os_helper.make_bad_fd()
3954
3955        with self.assertRaises(OSError) as ctx:
3956            os.get_inheritable(fd)
3957        self.assertEqual(ctx.exception.errno, errno.EBADF)
3958
3959        with self.assertRaises(OSError) as ctx:
3960            os.set_inheritable(fd, True)
3961        self.assertEqual(ctx.exception.errno, errno.EBADF)
3962
3963        with self.assertRaises(OSError) as ctx:
3964            os.set_inheritable(fd, False)
3965        self.assertEqual(ctx.exception.errno, errno.EBADF)
3966
3967    def test_open(self):
3968        fd = os.open(__file__, os.O_RDONLY)
3969        self.addCleanup(os.close, fd)
3970        self.assertEqual(os.get_inheritable(fd), False)
3971
3972    @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3973    def test_pipe(self):
3974        rfd, wfd = os.pipe()
3975        self.addCleanup(os.close, rfd)
3976        self.addCleanup(os.close, wfd)
3977        self.assertEqual(os.get_inheritable(rfd), False)
3978        self.assertEqual(os.get_inheritable(wfd), False)
3979
3980    def test_dup(self):
3981        fd1 = os.open(__file__, os.O_RDONLY)
3982        self.addCleanup(os.close, fd1)
3983
3984        fd2 = os.dup(fd1)
3985        self.addCleanup(os.close, fd2)
3986        self.assertEqual(os.get_inheritable(fd2), False)
3987
3988    def test_dup_standard_stream(self):
3989        fd = os.dup(1)
3990        self.addCleanup(os.close, fd)
3991        self.assertGreater(fd, 0)
3992
3993    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3994    def test_dup_nul(self):
3995        # os.dup() was creating inheritable fds for character files.
3996        fd1 = os.open('NUL', os.O_RDONLY)
3997        self.addCleanup(os.close, fd1)
3998        fd2 = os.dup(fd1)
3999        self.addCleanup(os.close, fd2)
4000        self.assertFalse(os.get_inheritable(fd2))
4001
4002    @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
4003    def test_dup2(self):
4004        fd = os.open(__file__, os.O_RDONLY)
4005        self.addCleanup(os.close, fd)
4006
4007        # inheritable by default
4008        fd2 = os.open(__file__, os.O_RDONLY)
4009        self.addCleanup(os.close, fd2)
4010        self.assertEqual(os.dup2(fd, fd2), fd2)
4011        self.assertTrue(os.get_inheritable(fd2))
4012
4013        # force non-inheritable
4014        fd3 = os.open(__file__, os.O_RDONLY)
4015        self.addCleanup(os.close, fd3)
4016        self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
4017        self.assertFalse(os.get_inheritable(fd3))
4018
4019    @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
4020    def test_openpty(self):
4021        master_fd, slave_fd = os.openpty()
4022        self.addCleanup(os.close, master_fd)
4023        self.addCleanup(os.close, slave_fd)
4024        self.assertEqual(os.get_inheritable(master_fd), False)
4025        self.assertEqual(os.get_inheritable(slave_fd), False)
4026
4027
4028class PathTConverterTests(unittest.TestCase):
4029    # tuples of (function name, allows fd arguments, additional arguments to
4030    # function, cleanup function)
4031    functions = [
4032        ('stat', True, (), None),
4033        ('lstat', False, (), None),
4034        ('access', False, (os.F_OK,), None),
4035        ('chflags', False, (0,), None),
4036        ('lchflags', False, (0,), None),
4037        ('open', False, (os.O_RDONLY,), getattr(os, 'close', None)),
4038    ]
4039
4040    def test_path_t_converter(self):
4041        str_filename = os_helper.TESTFN
4042        if os.name == 'nt':
4043            bytes_fspath = bytes_filename = None
4044        else:
4045            bytes_filename = os.fsencode(os_helper.TESTFN)
4046            bytes_fspath = FakePath(bytes_filename)
4047        fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
4048        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
4049        self.addCleanup(os.close, fd)
4050
4051        int_fspath = FakePath(fd)
4052        str_fspath = FakePath(str_filename)
4053
4054        for name, allow_fd, extra_args, cleanup_fn in self.functions:
4055            with self.subTest(name=name):
4056                try:
4057                    fn = getattr(os, name)
4058                except AttributeError:
4059                    continue
4060
4061                for path in (str_filename, bytes_filename, str_fspath,
4062                             bytes_fspath):
4063                    if path is None:
4064                        continue
4065                    with self.subTest(name=name, path=path):
4066                        result = fn(path, *extra_args)
4067                        if cleanup_fn is not None:
4068                            cleanup_fn(result)
4069
4070                with self.assertRaisesRegex(
4071                        TypeError, 'to return str or bytes'):
4072                    fn(int_fspath, *extra_args)
4073
4074                if allow_fd:
4075                    result = fn(fd, *extra_args)  # should not fail
4076                    if cleanup_fn is not None:
4077                        cleanup_fn(result)
4078                else:
4079                    with self.assertRaisesRegex(
4080                            TypeError,
4081                            'os.PathLike'):
4082                        fn(fd, *extra_args)
4083
4084    def test_path_t_converter_and_custom_class(self):
4085        msg = r'__fspath__\(\) to return str or bytes, not %s'
4086        with self.assertRaisesRegex(TypeError, msg % r'int'):
4087            os.stat(FakePath(2))
4088        with self.assertRaisesRegex(TypeError, msg % r'float'):
4089            os.stat(FakePath(2.34))
4090        with self.assertRaisesRegex(TypeError, msg % r'object'):
4091            os.stat(FakePath(object()))
4092
4093
4094@unittest.skipUnless(hasattr(os, 'get_blocking'),
4095                     'needs os.get_blocking() and os.set_blocking()')
4096@unittest.skipIf(support.is_emscripten, "Cannot unset blocking flag")
4097class BlockingTests(unittest.TestCase):
4098    def test_blocking(self):
4099        fd = os.open(__file__, os.O_RDONLY)
4100        self.addCleanup(os.close, fd)
4101        self.assertEqual(os.get_blocking(fd), True)
4102
4103        os.set_blocking(fd, False)
4104        self.assertEqual(os.get_blocking(fd), False)
4105
4106        os.set_blocking(fd, True)
4107        self.assertEqual(os.get_blocking(fd), True)
4108
4109
4110
4111class ExportsTests(unittest.TestCase):
4112    def test_os_all(self):
4113        self.assertIn('open', os.__all__)
4114        self.assertIn('walk', os.__all__)
4115
4116
4117class TestDirEntry(unittest.TestCase):
4118    def setUp(self):
4119        self.path = os.path.realpath(os_helper.TESTFN)
4120        self.addCleanup(os_helper.rmtree, self.path)
4121        os.mkdir(self.path)
4122
4123    def test_uninstantiable(self):
4124        self.assertRaises(TypeError, os.DirEntry)
4125
4126    def test_unpickable(self):
4127        filename = create_file(os.path.join(self.path, "file.txt"), b'python')
4128        entry = [entry for entry in os.scandir(self.path)].pop()
4129        self.assertIsInstance(entry, os.DirEntry)
4130        self.assertEqual(entry.name, "file.txt")
4131        import pickle
4132        self.assertRaises(TypeError, pickle.dumps, entry, filename)
4133
4134
4135class TestScandir(unittest.TestCase):
4136    check_no_resource_warning = warnings_helper.check_no_resource_warning
4137
4138    def setUp(self):
4139        self.path = os.path.realpath(os_helper.TESTFN)
4140        self.bytes_path = os.fsencode(self.path)
4141        self.addCleanup(os_helper.rmtree, self.path)
4142        os.mkdir(self.path)
4143
4144    def create_file(self, name="file.txt"):
4145        path = self.bytes_path if isinstance(name, bytes) else self.path
4146        filename = os.path.join(path, name)
4147        create_file(filename, b'python')
4148        return filename
4149
4150    def get_entries(self, names):
4151        entries = dict((entry.name, entry)
4152                       for entry in os.scandir(self.path))
4153        self.assertEqual(sorted(entries.keys()), names)
4154        return entries
4155
4156    def assert_stat_equal(self, stat1, stat2, skip_fields):
4157        if skip_fields:
4158            for attr in dir(stat1):
4159                if not attr.startswith("st_"):
4160                    continue
4161                if attr in ("st_dev", "st_ino", "st_nlink"):
4162                    continue
4163                self.assertEqual(getattr(stat1, attr),
4164                                 getattr(stat2, attr),
4165                                 (stat1, stat2, attr))
4166        else:
4167            self.assertEqual(stat1, stat2)
4168
4169    def test_uninstantiable(self):
4170        scandir_iter = os.scandir(self.path)
4171        self.assertRaises(TypeError, type(scandir_iter))
4172        scandir_iter.close()
4173
4174    def test_unpickable(self):
4175        filename = self.create_file("file.txt")
4176        scandir_iter = os.scandir(self.path)
4177        import pickle
4178        self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
4179        scandir_iter.close()
4180
4181    def check_entry(self, entry, name, is_dir, is_file, is_symlink):
4182        self.assertIsInstance(entry, os.DirEntry)
4183        self.assertEqual(entry.name, name)
4184        self.assertEqual(entry.path, os.path.join(self.path, name))
4185        self.assertEqual(entry.inode(),
4186                         os.stat(entry.path, follow_symlinks=False).st_ino)
4187
4188        entry_stat = os.stat(entry.path)
4189        self.assertEqual(entry.is_dir(),
4190                         stat.S_ISDIR(entry_stat.st_mode))
4191        self.assertEqual(entry.is_file(),
4192                         stat.S_ISREG(entry_stat.st_mode))
4193        self.assertEqual(entry.is_symlink(),
4194                         os.path.islink(entry.path))
4195
4196        entry_lstat = os.stat(entry.path, follow_symlinks=False)
4197        self.assertEqual(entry.is_dir(follow_symlinks=False),
4198                         stat.S_ISDIR(entry_lstat.st_mode))
4199        self.assertEqual(entry.is_file(follow_symlinks=False),
4200                         stat.S_ISREG(entry_lstat.st_mode))
4201
4202        self.assert_stat_equal(entry.stat(),
4203                               entry_stat,
4204                               os.name == 'nt' and not is_symlink)
4205        self.assert_stat_equal(entry.stat(follow_symlinks=False),
4206                               entry_lstat,
4207                               os.name == 'nt')
4208
4209    def test_attributes(self):
4210        link = hasattr(os, 'link')
4211        symlink = os_helper.can_symlink()
4212
4213        dirname = os.path.join(self.path, "dir")
4214        os.mkdir(dirname)
4215        filename = self.create_file("file.txt")
4216        if link:
4217            try:
4218                os.link(filename, os.path.join(self.path, "link_file.txt"))
4219            except PermissionError as e:
4220                self.skipTest('os.link(): %s' % e)
4221        if symlink:
4222            os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
4223                       target_is_directory=True)
4224            os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
4225
4226        names = ['dir', 'file.txt']
4227        if link:
4228            names.append('link_file.txt')
4229        if symlink:
4230            names.extend(('symlink_dir', 'symlink_file.txt'))
4231        entries = self.get_entries(names)
4232
4233        entry = entries['dir']
4234        self.check_entry(entry, 'dir', True, False, False)
4235
4236        entry = entries['file.txt']
4237        self.check_entry(entry, 'file.txt', False, True, False)
4238
4239        if link:
4240            entry = entries['link_file.txt']
4241            self.check_entry(entry, 'link_file.txt', False, True, False)
4242
4243        if symlink:
4244            entry = entries['symlink_dir']
4245            self.check_entry(entry, 'symlink_dir', True, False, True)
4246
4247            entry = entries['symlink_file.txt']
4248            self.check_entry(entry, 'symlink_file.txt', False, True, True)
4249
4250    def get_entry(self, name):
4251        path = self.bytes_path if isinstance(name, bytes) else self.path
4252        entries = list(os.scandir(path))
4253        self.assertEqual(len(entries), 1)
4254
4255        entry = entries[0]
4256        self.assertEqual(entry.name, name)
4257        return entry
4258
4259    def create_file_entry(self, name='file.txt'):
4260        filename = self.create_file(name=name)
4261        return self.get_entry(os.path.basename(filename))
4262
4263    def test_current_directory(self):
4264        filename = self.create_file()
4265        old_dir = os.getcwd()
4266        try:
4267            os.chdir(self.path)
4268
4269            # call scandir() without parameter: it must list the content
4270            # of the current directory
4271            entries = dict((entry.name, entry) for entry in os.scandir())
4272            self.assertEqual(sorted(entries.keys()),
4273                             [os.path.basename(filename)])
4274        finally:
4275            os.chdir(old_dir)
4276
4277    def test_repr(self):
4278        entry = self.create_file_entry()
4279        self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
4280
4281    def test_fspath_protocol(self):
4282        entry = self.create_file_entry()
4283        self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
4284
4285    def test_fspath_protocol_bytes(self):
4286        bytes_filename = os.fsencode('bytesfile.txt')
4287        bytes_entry = self.create_file_entry(name=bytes_filename)
4288        fspath = os.fspath(bytes_entry)
4289        self.assertIsInstance(fspath, bytes)
4290        self.assertEqual(fspath,
4291                         os.path.join(os.fsencode(self.path),bytes_filename))
4292
4293    def test_removed_dir(self):
4294        path = os.path.join(self.path, 'dir')
4295
4296        os.mkdir(path)
4297        entry = self.get_entry('dir')
4298        os.rmdir(path)
4299
4300        # On POSIX, is_dir() result depends if scandir() filled d_type or not
4301        if os.name == 'nt':
4302            self.assertTrue(entry.is_dir())
4303        self.assertFalse(entry.is_file())
4304        self.assertFalse(entry.is_symlink())
4305        if os.name == 'nt':
4306            self.assertRaises(FileNotFoundError, entry.inode)
4307            # don't fail
4308            entry.stat()
4309            entry.stat(follow_symlinks=False)
4310        else:
4311            self.assertGreater(entry.inode(), 0)
4312            self.assertRaises(FileNotFoundError, entry.stat)
4313            self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4314
4315    def test_removed_file(self):
4316        entry = self.create_file_entry()
4317        os.unlink(entry.path)
4318
4319        self.assertFalse(entry.is_dir())
4320        # On POSIX, is_dir() result depends if scandir() filled d_type or not
4321        if os.name == 'nt':
4322            self.assertTrue(entry.is_file())
4323        self.assertFalse(entry.is_symlink())
4324        if os.name == 'nt':
4325            self.assertRaises(FileNotFoundError, entry.inode)
4326            # don't fail
4327            entry.stat()
4328            entry.stat(follow_symlinks=False)
4329        else:
4330            self.assertGreater(entry.inode(), 0)
4331            self.assertRaises(FileNotFoundError, entry.stat)
4332            self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4333
4334    def test_broken_symlink(self):
4335        if not os_helper.can_symlink():
4336            return self.skipTest('cannot create symbolic link')
4337
4338        filename = self.create_file("file.txt")
4339        os.symlink(filename,
4340                   os.path.join(self.path, "symlink.txt"))
4341        entries = self.get_entries(['file.txt', 'symlink.txt'])
4342        entry = entries['symlink.txt']
4343        os.unlink(filename)
4344
4345        self.assertGreater(entry.inode(), 0)
4346        self.assertFalse(entry.is_dir())
4347        self.assertFalse(entry.is_file())  # broken symlink returns False
4348        self.assertFalse(entry.is_dir(follow_symlinks=False))
4349        self.assertFalse(entry.is_file(follow_symlinks=False))
4350        self.assertTrue(entry.is_symlink())
4351        self.assertRaises(FileNotFoundError, entry.stat)
4352        # don't fail
4353        entry.stat(follow_symlinks=False)
4354
4355    def test_bytes(self):
4356        self.create_file("file.txt")
4357
4358        path_bytes = os.fsencode(self.path)
4359        entries = list(os.scandir(path_bytes))
4360        self.assertEqual(len(entries), 1, entries)
4361        entry = entries[0]
4362
4363        self.assertEqual(entry.name, b'file.txt')
4364        self.assertEqual(entry.path,
4365                         os.fsencode(os.path.join(self.path, 'file.txt')))
4366
4367    def test_bytes_like(self):
4368        self.create_file("file.txt")
4369
4370        for cls in bytearray, memoryview:
4371            path_bytes = cls(os.fsencode(self.path))
4372            with self.assertWarns(DeprecationWarning):
4373                entries = list(os.scandir(path_bytes))
4374            self.assertEqual(len(entries), 1, entries)
4375            entry = entries[0]
4376
4377            self.assertEqual(entry.name, b'file.txt')
4378            self.assertEqual(entry.path,
4379                             os.fsencode(os.path.join(self.path, 'file.txt')))
4380            self.assertIs(type(entry.name), bytes)
4381            self.assertIs(type(entry.path), bytes)
4382
4383    @unittest.skipUnless(os.listdir in os.supports_fd,
4384                         'fd support for listdir required for this test.')
4385    def test_fd(self):
4386        self.assertIn(os.scandir, os.supports_fd)
4387        self.create_file('file.txt')
4388        expected_names = ['file.txt']
4389        if os_helper.can_symlink():
4390            os.symlink('file.txt', os.path.join(self.path, 'link'))
4391            expected_names.append('link')
4392
4393        with os_helper.open_dir_fd(self.path) as fd:
4394            with os.scandir(fd) as it:
4395                entries = list(it)
4396            names = [entry.name for entry in entries]
4397            self.assertEqual(sorted(names), expected_names)
4398            self.assertEqual(names, os.listdir(fd))
4399            for entry in entries:
4400                self.assertEqual(entry.path, entry.name)
4401                self.assertEqual(os.fspath(entry), entry.name)
4402                self.assertEqual(entry.is_symlink(), entry.name == 'link')
4403                if os.stat in os.supports_dir_fd:
4404                    st = os.stat(entry.name, dir_fd=fd)
4405                    self.assertEqual(entry.stat(), st)
4406                    st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4407                    self.assertEqual(entry.stat(follow_symlinks=False), st)
4408
4409    @unittest.skipIf(support.is_wasi, "WASI maps '' to cwd")
4410    def test_empty_path(self):
4411        self.assertRaises(FileNotFoundError, os.scandir, '')
4412
4413    def test_consume_iterator_twice(self):
4414        self.create_file("file.txt")
4415        iterator = os.scandir(self.path)
4416
4417        entries = list(iterator)
4418        self.assertEqual(len(entries), 1, entries)
4419
4420        # check than consuming the iterator twice doesn't raise exception
4421        entries2 = list(iterator)
4422        self.assertEqual(len(entries2), 0, entries2)
4423
4424    def test_bad_path_type(self):
4425        for obj in [1.234, {}, []]:
4426            self.assertRaises(TypeError, os.scandir, obj)
4427
4428    def test_close(self):
4429        self.create_file("file.txt")
4430        self.create_file("file2.txt")
4431        iterator = os.scandir(self.path)
4432        next(iterator)
4433        iterator.close()
4434        # multiple closes
4435        iterator.close()
4436        with self.check_no_resource_warning():
4437            del iterator
4438
4439    def test_context_manager(self):
4440        self.create_file("file.txt")
4441        self.create_file("file2.txt")
4442        with os.scandir(self.path) as iterator:
4443            next(iterator)
4444        with self.check_no_resource_warning():
4445            del iterator
4446
4447    def test_context_manager_close(self):
4448        self.create_file("file.txt")
4449        self.create_file("file2.txt")
4450        with os.scandir(self.path) as iterator:
4451            next(iterator)
4452            iterator.close()
4453
4454    def test_context_manager_exception(self):
4455        self.create_file("file.txt")
4456        self.create_file("file2.txt")
4457        with self.assertRaises(ZeroDivisionError):
4458            with os.scandir(self.path) as iterator:
4459                next(iterator)
4460                1/0
4461        with self.check_no_resource_warning():
4462            del iterator
4463
4464    def test_resource_warning(self):
4465        self.create_file("file.txt")
4466        self.create_file("file2.txt")
4467        iterator = os.scandir(self.path)
4468        next(iterator)
4469        with self.assertWarns(ResourceWarning):
4470            del iterator
4471            support.gc_collect()
4472        # exhausted iterator
4473        iterator = os.scandir(self.path)
4474        list(iterator)
4475        with self.check_no_resource_warning():
4476            del iterator
4477
4478
4479class TestPEP519(unittest.TestCase):
4480
4481    # Abstracted so it can be overridden to test pure Python implementation
4482    # if a C version is provided.
4483    fspath = staticmethod(os.fspath)
4484
4485    def test_return_bytes(self):
4486        for b in b'hello', b'goodbye', b'some/path/and/file':
4487            self.assertEqual(b, self.fspath(b))
4488
4489    def test_return_string(self):
4490        for s in 'hello', 'goodbye', 'some/path/and/file':
4491            self.assertEqual(s, self.fspath(s))
4492
4493    def test_fsencode_fsdecode(self):
4494        for p in "path/like/object", b"path/like/object":
4495            pathlike = FakePath(p)
4496
4497            self.assertEqual(p, self.fspath(pathlike))
4498            self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4499            self.assertEqual("path/like/object", os.fsdecode(pathlike))
4500
4501    def test_pathlike(self):
4502        self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4503        self.assertTrue(issubclass(FakePath, os.PathLike))
4504        self.assertTrue(isinstance(FakePath('x'), os.PathLike))
4505
4506    def test_garbage_in_exception_out(self):
4507        vapor = type('blah', (), {})
4508        for o in int, type, os, vapor():
4509            self.assertRaises(TypeError, self.fspath, o)
4510
4511    def test_argument_required(self):
4512        self.assertRaises(TypeError, self.fspath)
4513
4514    def test_bad_pathlike(self):
4515        # __fspath__ returns a value other than str or bytes.
4516        self.assertRaises(TypeError, self.fspath, FakePath(42))
4517        # __fspath__ attribute that is not callable.
4518        c = type('foo', (), {})
4519        c.__fspath__ = 1
4520        self.assertRaises(TypeError, self.fspath, c())
4521        # __fspath__ raises an exception.
4522        self.assertRaises(ZeroDivisionError, self.fspath,
4523                          FakePath(ZeroDivisionError()))
4524
4525    def test_pathlike_subclasshook(self):
4526        # bpo-38878: subclasshook causes subclass checks
4527        # true on abstract implementation.
4528        class A(os.PathLike):
4529            pass
4530        self.assertFalse(issubclass(FakePath, A))
4531        self.assertTrue(issubclass(FakePath, os.PathLike))
4532
4533    def test_pathlike_class_getitem(self):
4534        self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
4535
4536
4537class TimesTests(unittest.TestCase):
4538    def test_times(self):
4539        times = os.times()
4540        self.assertIsInstance(times, os.times_result)
4541
4542        for field in ('user', 'system', 'children_user', 'children_system',
4543                      'elapsed'):
4544            value = getattr(times, field)
4545            self.assertIsInstance(value, float)
4546
4547        if os.name == 'nt':
4548            self.assertEqual(times.children_user, 0)
4549            self.assertEqual(times.children_system, 0)
4550            self.assertEqual(times.elapsed, 0)
4551
4552
4553@support.requires_fork()
4554class ForkTests(unittest.TestCase):
4555    def test_fork(self):
4556        # bpo-42540: ensure os.fork() with non-default memory allocator does
4557        # not crash on exit.
4558        code = """if 1:
4559            import os
4560            from test import support
4561            pid = os.fork()
4562            if pid != 0:
4563                support.wait_process(pid, exitcode=0)
4564        """
4565        assert_python_ok("-c", code)
4566        assert_python_ok("-c", code, PYTHONMALLOC="malloc_debug")
4567
4568
4569# Only test if the C version is provided, otherwise TestPEP519 already tested
4570# the pure Python implementation.
4571if hasattr(os, "_fspath"):
4572    class TestPEP519PurePython(TestPEP519):
4573
4574        """Explicitly test the pure Python implementation of os.fspath()."""
4575
4576        fspath = staticmethod(os._fspath)
4577
4578
4579if __name__ == "__main__":
4580    unittest.main()
4581