1# Adapted from test_file.py by Daniel Stutzbach 2 3import sys 4import os 5import io 6import errno 7import unittest 8from array import array 9from weakref import proxy 10from functools import wraps 11 12from test.support import ( 13 cpython_only, swap_attr, gc_collect, is_emscripten, is_wasi 14) 15from test.support.os_helper import ( 16 TESTFN, TESTFN_ASCII, TESTFN_UNICODE, make_bad_fd, 17 ) 18from test.support.warnings_helper import check_warnings 19from collections import UserList 20 21import _io # C implementation of io 22import _pyio # Python implementation of io 23 24 25class AutoFileTests: 26 # file tests for which a test file is automatically set up 27 28 def setUp(self): 29 self.f = self.FileIO(TESTFN, 'w') 30 31 def tearDown(self): 32 if self.f: 33 self.f.close() 34 os.remove(TESTFN) 35 36 def testWeakRefs(self): 37 # verify weak references 38 p = proxy(self.f) 39 p.write(bytes(range(10))) 40 self.assertEqual(self.f.tell(), p.tell()) 41 self.f.close() 42 self.f = None 43 gc_collect() # For PyPy or other GCs. 44 self.assertRaises(ReferenceError, getattr, p, 'tell') 45 46 def testSeekTell(self): 47 self.f.write(bytes(range(20))) 48 self.assertEqual(self.f.tell(), 20) 49 self.f.seek(0) 50 self.assertEqual(self.f.tell(), 0) 51 self.f.seek(10) 52 self.assertEqual(self.f.tell(), 10) 53 self.f.seek(5, 1) 54 self.assertEqual(self.f.tell(), 15) 55 self.f.seek(-5, 1) 56 self.assertEqual(self.f.tell(), 10) 57 self.f.seek(-5, 2) 58 self.assertEqual(self.f.tell(), 15) 59 60 def testAttributes(self): 61 # verify expected attributes exist 62 f = self.f 63 64 self.assertEqual(f.mode, "wb") 65 self.assertEqual(f.closed, False) 66 67 # verify the attributes are readonly 68 for attr in 'mode', 'closed': 69 self.assertRaises((AttributeError, TypeError), 70 setattr, f, attr, 'oops') 71 72 @unittest.skipIf(is_wasi, "WASI does not expose st_blksize.") 73 def testBlksize(self): 74 # test private _blksize attribute 75 blksize = io.DEFAULT_BUFFER_SIZE 76 # try to get preferred blksize from stat.st_blksize, if available 77 if hasattr(os, 'fstat'): 78 fst = os.fstat(self.f.fileno()) 79 blksize = getattr(fst, 'st_blksize', blksize) 80 self.assertEqual(self.f._blksize, blksize) 81 82 # verify readinto 83 def testReadintoByteArray(self): 84 self.f.write(bytes([1, 2, 0, 255])) 85 self.f.close() 86 87 ba = bytearray(b'abcdefgh') 88 with self.FileIO(TESTFN, 'r') as f: 89 n = f.readinto(ba) 90 self.assertEqual(ba, b'\x01\x02\x00\xffefgh') 91 self.assertEqual(n, 4) 92 93 def _testReadintoMemoryview(self): 94 self.f.write(bytes([1, 2, 0, 255])) 95 self.f.close() 96 97 m = memoryview(bytearray(b'abcdefgh')) 98 with self.FileIO(TESTFN, 'r') as f: 99 n = f.readinto(m) 100 self.assertEqual(m, b'\x01\x02\x00\xffefgh') 101 self.assertEqual(n, 4) 102 103 m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2]) 104 with self.FileIO(TESTFN, 'r') as f: 105 n = f.readinto(m) 106 self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh') 107 self.assertEqual(n, 4) 108 109 def _testReadintoArray(self): 110 self.f.write(bytes([1, 2, 0, 255])) 111 self.f.close() 112 113 a = array('B', b'abcdefgh') 114 with self.FileIO(TESTFN, 'r') as f: 115 n = f.readinto(a) 116 self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104])) 117 self.assertEqual(n, 4) 118 119 a = array('b', b'abcdefgh') 120 with self.FileIO(TESTFN, 'r') as f: 121 n = f.readinto(a) 122 self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104])) 123 self.assertEqual(n, 4) 124 125 a = array('I', b'abcdefgh') 126 with self.FileIO(TESTFN, 'r') as f: 127 n = f.readinto(a) 128 self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh')) 129 self.assertEqual(n, 4) 130 131 def testWritelinesList(self): 132 l = [b'123', b'456'] 133 self.f.writelines(l) 134 self.f.close() 135 self.f = self.FileIO(TESTFN, 'rb') 136 buf = self.f.read() 137 self.assertEqual(buf, b'123456') 138 139 def testWritelinesUserList(self): 140 l = UserList([b'123', b'456']) 141 self.f.writelines(l) 142 self.f.close() 143 self.f = self.FileIO(TESTFN, 'rb') 144 buf = self.f.read() 145 self.assertEqual(buf, b'123456') 146 147 def testWritelinesError(self): 148 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) 149 self.assertRaises(TypeError, self.f.writelines, None) 150 self.assertRaises(TypeError, self.f.writelines, "abc") 151 152 def test_none_args(self): 153 self.f.write(b"hi\nbye\nabc") 154 self.f.close() 155 self.f = self.FileIO(TESTFN, 'r') 156 self.assertEqual(self.f.read(None), b"hi\nbye\nabc") 157 self.f.seek(0) 158 self.assertEqual(self.f.readline(None), b"hi\n") 159 self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"]) 160 161 def test_reject(self): 162 self.assertRaises(TypeError, self.f.write, "Hello!") 163 164 def testRepr(self): 165 self.assertEqual(repr(self.f), 166 "<%s.FileIO name=%r mode=%r closefd=True>" % 167 (self.modulename, self.f.name, self.f.mode)) 168 del self.f.name 169 self.assertEqual(repr(self.f), 170 "<%s.FileIO fd=%r mode=%r closefd=True>" % 171 (self.modulename, self.f.fileno(), self.f.mode)) 172 self.f.close() 173 self.assertEqual(repr(self.f), 174 "<%s.FileIO [closed]>" % (self.modulename,)) 175 176 def testReprNoCloseFD(self): 177 fd = os.open(TESTFN, os.O_RDONLY) 178 try: 179 with self.FileIO(fd, 'r', closefd=False) as f: 180 self.assertEqual(repr(f), 181 "<%s.FileIO name=%r mode=%r closefd=False>" % 182 (self.modulename, f.name, f.mode)) 183 finally: 184 os.close(fd) 185 186 def testRecursiveRepr(self): 187 # Issue #25455 188 with swap_attr(self.f, 'name', self.f): 189 with self.assertRaises(RuntimeError): 190 repr(self.f) # Should not crash 191 192 def testErrors(self): 193 f = self.f 194 self.assertFalse(f.isatty()) 195 self.assertFalse(f.closed) 196 #self.assertEqual(f.name, TESTFN) 197 self.assertRaises(ValueError, f.read, 10) # Open for reading 198 f.close() 199 self.assertTrue(f.closed) 200 f = self.FileIO(TESTFN, 'r') 201 self.assertRaises(TypeError, f.readinto, "") 202 self.assertFalse(f.closed) 203 f.close() 204 self.assertTrue(f.closed) 205 206 def testMethods(self): 207 methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable', 208 'read', 'readall', 'readline', 'readlines', 209 'tell', 'truncate', 'flush'] 210 211 self.f.close() 212 self.assertTrue(self.f.closed) 213 214 for methodname in methods: 215 method = getattr(self.f, methodname) 216 # should raise on closed file 217 self.assertRaises(ValueError, method) 218 219 self.assertRaises(TypeError, self.f.readinto) 220 self.assertRaises(ValueError, self.f.readinto, bytearray(1)) 221 self.assertRaises(TypeError, self.f.seek) 222 self.assertRaises(ValueError, self.f.seek, 0) 223 self.assertRaises(TypeError, self.f.write) 224 self.assertRaises(ValueError, self.f.write, b'') 225 self.assertRaises(TypeError, self.f.writelines) 226 self.assertRaises(ValueError, self.f.writelines, b'') 227 228 def testOpendir(self): 229 # Issue 3703: opening a directory should fill the errno 230 # Windows always returns "[Errno 13]: Permission denied 231 # Unix uses fstat and returns "[Errno 21]: Is a directory" 232 try: 233 self.FileIO('.', 'r') 234 except OSError as e: 235 self.assertNotEqual(e.errno, 0) 236 self.assertEqual(e.filename, ".") 237 else: 238 self.fail("Should have raised OSError") 239 240 @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system") 241 def testOpenDirFD(self): 242 fd = os.open('.', os.O_RDONLY) 243 with self.assertRaises(OSError) as cm: 244 self.FileIO(fd, 'r') 245 os.close(fd) 246 self.assertEqual(cm.exception.errno, errno.EISDIR) 247 248 #A set of functions testing that we get expected behaviour if someone has 249 #manually closed the internal file descriptor. First, a decorator: 250 def ClosedFD(func): 251 @wraps(func) 252 def wrapper(self): 253 #forcibly close the fd before invoking the problem function 254 f = self.f 255 os.close(f.fileno()) 256 try: 257 func(self, f) 258 finally: 259 try: 260 self.f.close() 261 except OSError: 262 pass 263 return wrapper 264 265 def ClosedFDRaises(func): 266 @wraps(func) 267 def wrapper(self): 268 #forcibly close the fd before invoking the problem function 269 f = self.f 270 os.close(f.fileno()) 271 try: 272 func(self, f) 273 except OSError as e: 274 self.assertEqual(e.errno, errno.EBADF) 275 else: 276 self.fail("Should have raised OSError") 277 finally: 278 try: 279 self.f.close() 280 except OSError: 281 pass 282 return wrapper 283 284 @ClosedFDRaises 285 def testErrnoOnClose(self, f): 286 f.close() 287 288 @ClosedFDRaises 289 def testErrnoOnClosedWrite(self, f): 290 f.write(b'a') 291 292 @ClosedFDRaises 293 def testErrnoOnClosedSeek(self, f): 294 f.seek(0) 295 296 @ClosedFDRaises 297 def testErrnoOnClosedTell(self, f): 298 f.tell() 299 300 @ClosedFDRaises 301 def testErrnoOnClosedTruncate(self, f): 302 f.truncate(0) 303 304 @ClosedFD 305 def testErrnoOnClosedSeekable(self, f): 306 f.seekable() 307 308 @ClosedFD 309 def testErrnoOnClosedReadable(self, f): 310 f.readable() 311 312 @ClosedFD 313 def testErrnoOnClosedWritable(self, f): 314 f.writable() 315 316 @ClosedFD 317 def testErrnoOnClosedFileno(self, f): 318 f.fileno() 319 320 @ClosedFD 321 def testErrnoOnClosedIsatty(self, f): 322 self.assertEqual(f.isatty(), False) 323 324 def ReopenForRead(self): 325 try: 326 self.f.close() 327 except OSError: 328 pass 329 self.f = self.FileIO(TESTFN, 'r') 330 os.close(self.f.fileno()) 331 return self.f 332 333 @ClosedFDRaises 334 def testErrnoOnClosedRead(self, f): 335 f = self.ReopenForRead() 336 f.read(1) 337 338 @ClosedFDRaises 339 def testErrnoOnClosedReadall(self, f): 340 f = self.ReopenForRead() 341 f.readall() 342 343 @ClosedFDRaises 344 def testErrnoOnClosedReadinto(self, f): 345 f = self.ReopenForRead() 346 a = array('b', b'x'*10) 347 f.readinto(a) 348 349class CAutoFileTests(AutoFileTests, unittest.TestCase): 350 FileIO = _io.FileIO 351 modulename = '_io' 352 353class PyAutoFileTests(AutoFileTests, unittest.TestCase): 354 FileIO = _pyio.FileIO 355 modulename = '_pyio' 356 357 358class OtherFileTests: 359 360 def testAbles(self): 361 try: 362 f = self.FileIO(TESTFN, "w") 363 self.assertEqual(f.readable(), False) 364 self.assertEqual(f.writable(), True) 365 self.assertEqual(f.seekable(), True) 366 f.close() 367 368 f = self.FileIO(TESTFN, "r") 369 self.assertEqual(f.readable(), True) 370 self.assertEqual(f.writable(), False) 371 self.assertEqual(f.seekable(), True) 372 f.close() 373 374 f = self.FileIO(TESTFN, "a+") 375 self.assertEqual(f.readable(), True) 376 self.assertEqual(f.writable(), True) 377 self.assertEqual(f.seekable(), True) 378 self.assertEqual(f.isatty(), False) 379 f.close() 380 381 if sys.platform != "win32" and not is_emscripten: 382 try: 383 f = self.FileIO("/dev/tty", "a") 384 except OSError: 385 # When run in a cron job there just aren't any 386 # ttys, so skip the test. This also handles other 387 # OS'es that don't support /dev/tty. 388 pass 389 else: 390 self.assertEqual(f.readable(), False) 391 self.assertEqual(f.writable(), True) 392 if sys.platform != "darwin" and \ 393 'bsd' not in sys.platform and \ 394 not sys.platform.startswith(('sunos', 'aix')): 395 # Somehow /dev/tty appears seekable on some BSDs 396 self.assertEqual(f.seekable(), False) 397 self.assertEqual(f.isatty(), True) 398 f.close() 399 finally: 400 os.unlink(TESTFN) 401 402 def testInvalidModeStrings(self): 403 # check invalid mode strings 404 for mode in ("", "aU", "wU+", "rw", "rt"): 405 try: 406 f = self.FileIO(TESTFN, mode) 407 except ValueError: 408 pass 409 else: 410 f.close() 411 self.fail('%r is an invalid file mode' % mode) 412 413 def testModeStrings(self): 414 # test that the mode attribute is correct for various mode strings 415 # given as init args 416 try: 417 for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'), 418 ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'), 419 ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'), 420 ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]: 421 # read modes are last so that TESTFN will exist first 422 with self.FileIO(TESTFN, modes[0]) as f: 423 self.assertEqual(f.mode, modes[1]) 424 finally: 425 if os.path.exists(TESTFN): 426 os.unlink(TESTFN) 427 428 def testUnicodeOpen(self): 429 # verify repr works for unicode too 430 f = self.FileIO(str(TESTFN), "w") 431 f.close() 432 os.unlink(TESTFN) 433 434 def testBytesOpen(self): 435 # Opening a bytes filename 436 fn = TESTFN_ASCII.encode("ascii") 437 f = self.FileIO(fn, "w") 438 try: 439 f.write(b"abc") 440 f.close() 441 with open(TESTFN_ASCII, "rb") as f: 442 self.assertEqual(f.read(), b"abc") 443 finally: 444 os.unlink(TESTFN_ASCII) 445 446 @unittest.skipIf(sys.getfilesystemencoding() != 'utf-8', 447 "test only works for utf-8 filesystems") 448 def testUtf8BytesOpen(self): 449 # Opening a UTF-8 bytes filename 450 try: 451 fn = TESTFN_UNICODE.encode("utf-8") 452 except UnicodeEncodeError: 453 self.skipTest('could not encode %r to utf-8' % TESTFN_UNICODE) 454 f = self.FileIO(fn, "w") 455 try: 456 f.write(b"abc") 457 f.close() 458 with open(TESTFN_UNICODE, "rb") as f: 459 self.assertEqual(f.read(), b"abc") 460 finally: 461 os.unlink(TESTFN_UNICODE) 462 463 def testConstructorHandlesNULChars(self): 464 fn_with_NUL = 'foo\0bar' 465 self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w') 466 self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w') 467 468 def testInvalidFd(self): 469 self.assertRaises(ValueError, self.FileIO, -10) 470 self.assertRaises(OSError, self.FileIO, make_bad_fd()) 471 if sys.platform == 'win32': 472 import msvcrt 473 self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd()) 474 475 def testBadModeArgument(self): 476 # verify that we get a sensible error message for bad mode argument 477 bad_mode = "qwerty" 478 try: 479 f = self.FileIO(TESTFN, bad_mode) 480 except ValueError as msg: 481 if msg.args[0] != 0: 482 s = str(msg) 483 if TESTFN in s or bad_mode not in s: 484 self.fail("bad error message for invalid mode: %s" % s) 485 # if msg.args[0] == 0, we're probably on Windows where there may be 486 # no obvious way to discover why open() failed. 487 else: 488 f.close() 489 self.fail("no error for invalid mode: %s" % bad_mode) 490 491 def testTruncate(self): 492 f = self.FileIO(TESTFN, 'w') 493 f.write(bytes(bytearray(range(10)))) 494 self.assertEqual(f.tell(), 10) 495 f.truncate(5) 496 self.assertEqual(f.tell(), 10) 497 self.assertEqual(f.seek(0, io.SEEK_END), 5) 498 f.truncate(15) 499 self.assertEqual(f.tell(), 5) 500 self.assertEqual(f.seek(0, io.SEEK_END), 15) 501 f.close() 502 503 def testTruncateOnWindows(self): 504 def bug801631(): 505 # SF bug <http://www.python.org/sf/801631> 506 # "file.truncate fault on windows" 507 f = self.FileIO(TESTFN, 'w') 508 f.write(bytes(range(11))) 509 f.close() 510 511 f = self.FileIO(TESTFN,'r+') 512 data = f.read(5) 513 if data != bytes(range(5)): 514 self.fail("Read on file opened for update failed %r" % data) 515 if f.tell() != 5: 516 self.fail("File pos after read wrong %d" % f.tell()) 517 518 f.truncate() 519 if f.tell() != 5: 520 self.fail("File pos after ftruncate wrong %d" % f.tell()) 521 522 f.close() 523 size = os.path.getsize(TESTFN) 524 if size != 5: 525 self.fail("File size after ftruncate wrong %d" % size) 526 527 try: 528 bug801631() 529 finally: 530 os.unlink(TESTFN) 531 532 def testAppend(self): 533 try: 534 f = open(TESTFN, 'wb') 535 f.write(b'spam') 536 f.close() 537 f = open(TESTFN, 'ab') 538 f.write(b'eggs') 539 f.close() 540 f = open(TESTFN, 'rb') 541 d = f.read() 542 f.close() 543 self.assertEqual(d, b'spameggs') 544 finally: 545 try: 546 os.unlink(TESTFN) 547 except: 548 pass 549 550 def testInvalidInit(self): 551 self.assertRaises(TypeError, self.FileIO, "1", 0, 0) 552 553 def testWarnings(self): 554 with check_warnings(quiet=True) as w: 555 self.assertEqual(w.warnings, []) 556 self.assertRaises(TypeError, self.FileIO, []) 557 self.assertEqual(w.warnings, []) 558 self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt") 559 self.assertEqual(w.warnings, []) 560 561 def testUnclosedFDOnException(self): 562 class MyException(Exception): pass 563 class MyFileIO(self.FileIO): 564 def __setattr__(self, name, value): 565 if name == "name": 566 raise MyException("blocked setting name") 567 return super(MyFileIO, self).__setattr__(name, value) 568 fd = os.open(__file__, os.O_RDONLY) 569 self.assertRaises(MyException, MyFileIO, fd) 570 os.close(fd) # should not raise OSError(EBADF) 571 572 573class COtherFileTests(OtherFileTests, unittest.TestCase): 574 FileIO = _io.FileIO 575 modulename = '_io' 576 577 @cpython_only 578 def testInvalidFd_overflow(self): 579 # Issue 15989 580 import _testcapi 581 self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1) 582 self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1) 583 584 def test_open_code(self): 585 # Check that the default behaviour of open_code matches 586 # open("rb") 587 with self.FileIO(__file__, "rb") as f: 588 expected = f.read() 589 with _io.open_code(__file__) as f: 590 actual = f.read() 591 self.assertEqual(expected, actual) 592 593 594class PyOtherFileTests(OtherFileTests, unittest.TestCase): 595 FileIO = _pyio.FileIO 596 modulename = '_pyio' 597 598 def test_open_code(self): 599 # Check that the default behaviour of open_code matches 600 # open("rb") 601 with self.FileIO(__file__, "rb") as f: 602 expected = f.read() 603 with check_warnings(quiet=True) as w: 604 # Always test _open_code_with_warning 605 with _pyio._open_code_with_warning(__file__) as f: 606 actual = f.read() 607 self.assertEqual(expected, actual) 608 self.assertNotEqual(w.warnings, []) 609 610 611def tearDownModule(): 612 # Historically, these tests have been sloppy about removing TESTFN. 613 # So get rid of it no matter what. 614 if os.path.exists(TESTFN): 615 os.unlink(TESTFN) 616 617 618if __name__ == '__main__': 619 unittest.main() 620