1# Adapted from test_file.py by Daniel Stutzbach
2
3import sys
4import os
5import io
6import errno
7import unittest
8from array import array
9from weakref import proxy
10from functools import wraps
11
12from test.support import (
13    cpython_only, swap_attr, gc_collect, is_emscripten, is_wasi
14)
15from test.support.os_helper import (
16    TESTFN, TESTFN_ASCII, TESTFN_UNICODE, make_bad_fd,
17    )
18from test.support.warnings_helper import check_warnings
19from collections import UserList
20
21import _io  # C implementation of io
22import _pyio # Python implementation of io
23
24
25class AutoFileTests:
26    # file tests for which a test file is automatically set up
27
28    def setUp(self):
29        self.f = self.FileIO(TESTFN, 'w')
30
31    def tearDown(self):
32        if self.f:
33            self.f.close()
34        os.remove(TESTFN)
35
36    def testWeakRefs(self):
37        # verify weak references
38        p = proxy(self.f)
39        p.write(bytes(range(10)))
40        self.assertEqual(self.f.tell(), p.tell())
41        self.f.close()
42        self.f = None
43        gc_collect()  # For PyPy or other GCs.
44        self.assertRaises(ReferenceError, getattr, p, 'tell')
45
46    def testSeekTell(self):
47        self.f.write(bytes(range(20)))
48        self.assertEqual(self.f.tell(), 20)
49        self.f.seek(0)
50        self.assertEqual(self.f.tell(), 0)
51        self.f.seek(10)
52        self.assertEqual(self.f.tell(), 10)
53        self.f.seek(5, 1)
54        self.assertEqual(self.f.tell(), 15)
55        self.f.seek(-5, 1)
56        self.assertEqual(self.f.tell(), 10)
57        self.f.seek(-5, 2)
58        self.assertEqual(self.f.tell(), 15)
59
60    def testAttributes(self):
61        # verify expected attributes exist
62        f = self.f
63
64        self.assertEqual(f.mode, "wb")
65        self.assertEqual(f.closed, False)
66
67        # verify the attributes are readonly
68        for attr in 'mode', 'closed':
69            self.assertRaises((AttributeError, TypeError),
70                              setattr, f, attr, 'oops')
71
72    @unittest.skipIf(is_wasi, "WASI does not expose st_blksize.")
73    def testBlksize(self):
74        # test private _blksize attribute
75        blksize = io.DEFAULT_BUFFER_SIZE
76        # try to get preferred blksize from stat.st_blksize, if available
77        if hasattr(os, 'fstat'):
78            fst = os.fstat(self.f.fileno())
79            blksize = getattr(fst, 'st_blksize', blksize)
80        self.assertEqual(self.f._blksize, blksize)
81
82    # verify readinto
83    def testReadintoByteArray(self):
84        self.f.write(bytes([1, 2, 0, 255]))
85        self.f.close()
86
87        ba = bytearray(b'abcdefgh')
88        with self.FileIO(TESTFN, 'r') as f:
89            n = f.readinto(ba)
90        self.assertEqual(ba, b'\x01\x02\x00\xffefgh')
91        self.assertEqual(n, 4)
92
93    def _testReadintoMemoryview(self):
94        self.f.write(bytes([1, 2, 0, 255]))
95        self.f.close()
96
97        m = memoryview(bytearray(b'abcdefgh'))
98        with self.FileIO(TESTFN, 'r') as f:
99            n = f.readinto(m)
100        self.assertEqual(m, b'\x01\x02\x00\xffefgh')
101        self.assertEqual(n, 4)
102
103        m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2])
104        with self.FileIO(TESTFN, 'r') as f:
105            n = f.readinto(m)
106        self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh')
107        self.assertEqual(n, 4)
108
109    def _testReadintoArray(self):
110        self.f.write(bytes([1, 2, 0, 255]))
111        self.f.close()
112
113        a = array('B', b'abcdefgh')
114        with self.FileIO(TESTFN, 'r') as f:
115            n = f.readinto(a)
116        self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104]))
117        self.assertEqual(n, 4)
118
119        a = array('b', b'abcdefgh')
120        with self.FileIO(TESTFN, 'r') as f:
121            n = f.readinto(a)
122        self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104]))
123        self.assertEqual(n, 4)
124
125        a = array('I', b'abcdefgh')
126        with self.FileIO(TESTFN, 'r') as f:
127            n = f.readinto(a)
128        self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh'))
129        self.assertEqual(n, 4)
130
131    def testWritelinesList(self):
132        l = [b'123', b'456']
133        self.f.writelines(l)
134        self.f.close()
135        self.f = self.FileIO(TESTFN, 'rb')
136        buf = self.f.read()
137        self.assertEqual(buf, b'123456')
138
139    def testWritelinesUserList(self):
140        l = UserList([b'123', b'456'])
141        self.f.writelines(l)
142        self.f.close()
143        self.f = self.FileIO(TESTFN, 'rb')
144        buf = self.f.read()
145        self.assertEqual(buf, b'123456')
146
147    def testWritelinesError(self):
148        self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
149        self.assertRaises(TypeError, self.f.writelines, None)
150        self.assertRaises(TypeError, self.f.writelines, "abc")
151
152    def test_none_args(self):
153        self.f.write(b"hi\nbye\nabc")
154        self.f.close()
155        self.f = self.FileIO(TESTFN, 'r')
156        self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
157        self.f.seek(0)
158        self.assertEqual(self.f.readline(None), b"hi\n")
159        self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
160
161    def test_reject(self):
162        self.assertRaises(TypeError, self.f.write, "Hello!")
163
164    def testRepr(self):
165        self.assertEqual(repr(self.f),
166                         "<%s.FileIO name=%r mode=%r closefd=True>" %
167                         (self.modulename, self.f.name, self.f.mode))
168        del self.f.name
169        self.assertEqual(repr(self.f),
170                         "<%s.FileIO fd=%r mode=%r closefd=True>" %
171                         (self.modulename, self.f.fileno(), self.f.mode))
172        self.f.close()
173        self.assertEqual(repr(self.f),
174                         "<%s.FileIO [closed]>" % (self.modulename,))
175
176    def testReprNoCloseFD(self):
177        fd = os.open(TESTFN, os.O_RDONLY)
178        try:
179            with self.FileIO(fd, 'r', closefd=False) as f:
180                self.assertEqual(repr(f),
181                                 "<%s.FileIO name=%r mode=%r closefd=False>" %
182                                 (self.modulename, f.name, f.mode))
183        finally:
184            os.close(fd)
185
186    def testRecursiveRepr(self):
187        # Issue #25455
188        with swap_attr(self.f, 'name', self.f):
189            with self.assertRaises(RuntimeError):
190                repr(self.f)  # Should not crash
191
192    def testErrors(self):
193        f = self.f
194        self.assertFalse(f.isatty())
195        self.assertFalse(f.closed)
196        #self.assertEqual(f.name, TESTFN)
197        self.assertRaises(ValueError, f.read, 10) # Open for reading
198        f.close()
199        self.assertTrue(f.closed)
200        f = self.FileIO(TESTFN, 'r')
201        self.assertRaises(TypeError, f.readinto, "")
202        self.assertFalse(f.closed)
203        f.close()
204        self.assertTrue(f.closed)
205
206    def testMethods(self):
207        methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable',
208                   'read', 'readall', 'readline', 'readlines',
209                   'tell', 'truncate', 'flush']
210
211        self.f.close()
212        self.assertTrue(self.f.closed)
213
214        for methodname in methods:
215            method = getattr(self.f, methodname)
216            # should raise on closed file
217            self.assertRaises(ValueError, method)
218
219        self.assertRaises(TypeError, self.f.readinto)
220        self.assertRaises(ValueError, self.f.readinto, bytearray(1))
221        self.assertRaises(TypeError, self.f.seek)
222        self.assertRaises(ValueError, self.f.seek, 0)
223        self.assertRaises(TypeError, self.f.write)
224        self.assertRaises(ValueError, self.f.write, b'')
225        self.assertRaises(TypeError, self.f.writelines)
226        self.assertRaises(ValueError, self.f.writelines, b'')
227
228    def testOpendir(self):
229        # Issue 3703: opening a directory should fill the errno
230        # Windows always returns "[Errno 13]: Permission denied
231        # Unix uses fstat and returns "[Errno 21]: Is a directory"
232        try:
233            self.FileIO('.', 'r')
234        except OSError as e:
235            self.assertNotEqual(e.errno, 0)
236            self.assertEqual(e.filename, ".")
237        else:
238            self.fail("Should have raised OSError")
239
240    @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
241    def testOpenDirFD(self):
242        fd = os.open('.', os.O_RDONLY)
243        with self.assertRaises(OSError) as cm:
244            self.FileIO(fd, 'r')
245        os.close(fd)
246        self.assertEqual(cm.exception.errno, errno.EISDIR)
247
248    #A set of functions testing that we get expected behaviour if someone has
249    #manually closed the internal file descriptor.  First, a decorator:
250    def ClosedFD(func):
251        @wraps(func)
252        def wrapper(self):
253            #forcibly close the fd before invoking the problem function
254            f = self.f
255            os.close(f.fileno())
256            try:
257                func(self, f)
258            finally:
259                try:
260                    self.f.close()
261                except OSError:
262                    pass
263        return wrapper
264
265    def ClosedFDRaises(func):
266        @wraps(func)
267        def wrapper(self):
268            #forcibly close the fd before invoking the problem function
269            f = self.f
270            os.close(f.fileno())
271            try:
272                func(self, f)
273            except OSError as e:
274                self.assertEqual(e.errno, errno.EBADF)
275            else:
276                self.fail("Should have raised OSError")
277            finally:
278                try:
279                    self.f.close()
280                except OSError:
281                    pass
282        return wrapper
283
284    @ClosedFDRaises
285    def testErrnoOnClose(self, f):
286        f.close()
287
288    @ClosedFDRaises
289    def testErrnoOnClosedWrite(self, f):
290        f.write(b'a')
291
292    @ClosedFDRaises
293    def testErrnoOnClosedSeek(self, f):
294        f.seek(0)
295
296    @ClosedFDRaises
297    def testErrnoOnClosedTell(self, f):
298        f.tell()
299
300    @ClosedFDRaises
301    def testErrnoOnClosedTruncate(self, f):
302        f.truncate(0)
303
304    @ClosedFD
305    def testErrnoOnClosedSeekable(self, f):
306        f.seekable()
307
308    @ClosedFD
309    def testErrnoOnClosedReadable(self, f):
310        f.readable()
311
312    @ClosedFD
313    def testErrnoOnClosedWritable(self, f):
314        f.writable()
315
316    @ClosedFD
317    def testErrnoOnClosedFileno(self, f):
318        f.fileno()
319
320    @ClosedFD
321    def testErrnoOnClosedIsatty(self, f):
322        self.assertEqual(f.isatty(), False)
323
324    def ReopenForRead(self):
325        try:
326            self.f.close()
327        except OSError:
328            pass
329        self.f = self.FileIO(TESTFN, 'r')
330        os.close(self.f.fileno())
331        return self.f
332
333    @ClosedFDRaises
334    def testErrnoOnClosedRead(self, f):
335        f = self.ReopenForRead()
336        f.read(1)
337
338    @ClosedFDRaises
339    def testErrnoOnClosedReadall(self, f):
340        f = self.ReopenForRead()
341        f.readall()
342
343    @ClosedFDRaises
344    def testErrnoOnClosedReadinto(self, f):
345        f = self.ReopenForRead()
346        a = array('b', b'x'*10)
347        f.readinto(a)
348
349class CAutoFileTests(AutoFileTests, unittest.TestCase):
350    FileIO = _io.FileIO
351    modulename = '_io'
352
353class PyAutoFileTests(AutoFileTests, unittest.TestCase):
354    FileIO = _pyio.FileIO
355    modulename = '_pyio'
356
357
358class OtherFileTests:
359
360    def testAbles(self):
361        try:
362            f = self.FileIO(TESTFN, "w")
363            self.assertEqual(f.readable(), False)
364            self.assertEqual(f.writable(), True)
365            self.assertEqual(f.seekable(), True)
366            f.close()
367
368            f = self.FileIO(TESTFN, "r")
369            self.assertEqual(f.readable(), True)
370            self.assertEqual(f.writable(), False)
371            self.assertEqual(f.seekable(), True)
372            f.close()
373
374            f = self.FileIO(TESTFN, "a+")
375            self.assertEqual(f.readable(), True)
376            self.assertEqual(f.writable(), True)
377            self.assertEqual(f.seekable(), True)
378            self.assertEqual(f.isatty(), False)
379            f.close()
380
381            if sys.platform != "win32" and not is_emscripten:
382                try:
383                    f = self.FileIO("/dev/tty", "a")
384                except OSError:
385                    # When run in a cron job there just aren't any
386                    # ttys, so skip the test.  This also handles other
387                    # OS'es that don't support /dev/tty.
388                    pass
389                else:
390                    self.assertEqual(f.readable(), False)
391                    self.assertEqual(f.writable(), True)
392                    if sys.platform != "darwin" and \
393                       'bsd' not in sys.platform and \
394                       not sys.platform.startswith(('sunos', 'aix')):
395                        # Somehow /dev/tty appears seekable on some BSDs
396                        self.assertEqual(f.seekable(), False)
397                    self.assertEqual(f.isatty(), True)
398                    f.close()
399        finally:
400            os.unlink(TESTFN)
401
402    def testInvalidModeStrings(self):
403        # check invalid mode strings
404        for mode in ("", "aU", "wU+", "rw", "rt"):
405            try:
406                f = self.FileIO(TESTFN, mode)
407            except ValueError:
408                pass
409            else:
410                f.close()
411                self.fail('%r is an invalid file mode' % mode)
412
413    def testModeStrings(self):
414        # test that the mode attribute is correct for various mode strings
415        # given as init args
416        try:
417            for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'),
418                          ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'),
419                          ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
420                          ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
421                # read modes are last so that TESTFN will exist first
422                with self.FileIO(TESTFN, modes[0]) as f:
423                    self.assertEqual(f.mode, modes[1])
424        finally:
425            if os.path.exists(TESTFN):
426                os.unlink(TESTFN)
427
428    def testUnicodeOpen(self):
429        # verify repr works for unicode too
430        f = self.FileIO(str(TESTFN), "w")
431        f.close()
432        os.unlink(TESTFN)
433
434    def testBytesOpen(self):
435        # Opening a bytes filename
436        fn = TESTFN_ASCII.encode("ascii")
437        f = self.FileIO(fn, "w")
438        try:
439            f.write(b"abc")
440            f.close()
441            with open(TESTFN_ASCII, "rb") as f:
442                self.assertEqual(f.read(), b"abc")
443        finally:
444            os.unlink(TESTFN_ASCII)
445
446    @unittest.skipIf(sys.getfilesystemencoding() != 'utf-8',
447                     "test only works for utf-8 filesystems")
448    def testUtf8BytesOpen(self):
449        # Opening a UTF-8 bytes filename
450        try:
451            fn = TESTFN_UNICODE.encode("utf-8")
452        except UnicodeEncodeError:
453            self.skipTest('could not encode %r to utf-8' % TESTFN_UNICODE)
454        f = self.FileIO(fn, "w")
455        try:
456            f.write(b"abc")
457            f.close()
458            with open(TESTFN_UNICODE, "rb") as f:
459                self.assertEqual(f.read(), b"abc")
460        finally:
461            os.unlink(TESTFN_UNICODE)
462
463    def testConstructorHandlesNULChars(self):
464        fn_with_NUL = 'foo\0bar'
465        self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w')
466        self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
467
468    def testInvalidFd(self):
469        self.assertRaises(ValueError, self.FileIO, -10)
470        self.assertRaises(OSError, self.FileIO, make_bad_fd())
471        if sys.platform == 'win32':
472            import msvcrt
473            self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd())
474
475    def testBadModeArgument(self):
476        # verify that we get a sensible error message for bad mode argument
477        bad_mode = "qwerty"
478        try:
479            f = self.FileIO(TESTFN, bad_mode)
480        except ValueError as msg:
481            if msg.args[0] != 0:
482                s = str(msg)
483                if TESTFN in s or bad_mode not in s:
484                    self.fail("bad error message for invalid mode: %s" % s)
485            # if msg.args[0] == 0, we're probably on Windows where there may be
486            # no obvious way to discover why open() failed.
487        else:
488            f.close()
489            self.fail("no error for invalid mode: %s" % bad_mode)
490
491    def testTruncate(self):
492        f = self.FileIO(TESTFN, 'w')
493        f.write(bytes(bytearray(range(10))))
494        self.assertEqual(f.tell(), 10)
495        f.truncate(5)
496        self.assertEqual(f.tell(), 10)
497        self.assertEqual(f.seek(0, io.SEEK_END), 5)
498        f.truncate(15)
499        self.assertEqual(f.tell(), 5)
500        self.assertEqual(f.seek(0, io.SEEK_END), 15)
501        f.close()
502
503    def testTruncateOnWindows(self):
504        def bug801631():
505            # SF bug <http://www.python.org/sf/801631>
506            # "file.truncate fault on windows"
507            f = self.FileIO(TESTFN, 'w')
508            f.write(bytes(range(11)))
509            f.close()
510
511            f = self.FileIO(TESTFN,'r+')
512            data = f.read(5)
513            if data != bytes(range(5)):
514                self.fail("Read on file opened for update failed %r" % data)
515            if f.tell() != 5:
516                self.fail("File pos after read wrong %d" % f.tell())
517
518            f.truncate()
519            if f.tell() != 5:
520                self.fail("File pos after ftruncate wrong %d" % f.tell())
521
522            f.close()
523            size = os.path.getsize(TESTFN)
524            if size != 5:
525                self.fail("File size after ftruncate wrong %d" % size)
526
527        try:
528            bug801631()
529        finally:
530            os.unlink(TESTFN)
531
532    def testAppend(self):
533        try:
534            f = open(TESTFN, 'wb')
535            f.write(b'spam')
536            f.close()
537            f = open(TESTFN, 'ab')
538            f.write(b'eggs')
539            f.close()
540            f = open(TESTFN, 'rb')
541            d = f.read()
542            f.close()
543            self.assertEqual(d, b'spameggs')
544        finally:
545            try:
546                os.unlink(TESTFN)
547            except:
548                pass
549
550    def testInvalidInit(self):
551        self.assertRaises(TypeError, self.FileIO, "1", 0, 0)
552
553    def testWarnings(self):
554        with check_warnings(quiet=True) as w:
555            self.assertEqual(w.warnings, [])
556            self.assertRaises(TypeError, self.FileIO, [])
557            self.assertEqual(w.warnings, [])
558            self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt")
559            self.assertEqual(w.warnings, [])
560
561    def testUnclosedFDOnException(self):
562        class MyException(Exception): pass
563        class MyFileIO(self.FileIO):
564            def __setattr__(self, name, value):
565                if name == "name":
566                    raise MyException("blocked setting name")
567                return super(MyFileIO, self).__setattr__(name, value)
568        fd = os.open(__file__, os.O_RDONLY)
569        self.assertRaises(MyException, MyFileIO, fd)
570        os.close(fd)  # should not raise OSError(EBADF)
571
572
573class COtherFileTests(OtherFileTests, unittest.TestCase):
574    FileIO = _io.FileIO
575    modulename = '_io'
576
577    @cpython_only
578    def testInvalidFd_overflow(self):
579        # Issue 15989
580        import _testcapi
581        self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1)
582        self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1)
583
584    def test_open_code(self):
585        # Check that the default behaviour of open_code matches
586        # open("rb")
587        with self.FileIO(__file__, "rb") as f:
588            expected = f.read()
589        with _io.open_code(__file__) as f:
590            actual = f.read()
591        self.assertEqual(expected, actual)
592
593
594class PyOtherFileTests(OtherFileTests, unittest.TestCase):
595    FileIO = _pyio.FileIO
596    modulename = '_pyio'
597
598    def test_open_code(self):
599        # Check that the default behaviour of open_code matches
600        # open("rb")
601        with self.FileIO(__file__, "rb") as f:
602            expected = f.read()
603        with check_warnings(quiet=True) as w:
604            # Always test _open_code_with_warning
605            with _pyio._open_code_with_warning(__file__) as f:
606                actual = f.read()
607            self.assertEqual(expected, actual)
608            self.assertNotEqual(w.warnings, [])
609
610
611def tearDownModule():
612    # Historically, these tests have been sloppy about removing TESTFN.
613    # So get rid of it no matter what.
614    if os.path.exists(TESTFN):
615        os.unlink(TESTFN)
616
617
618if __name__ == '__main__':
619    unittest.main()
620