1# Copyright 2009 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" Uses :py:class:`FakeIoModule` to provide a 16 fake ``io`` module replacement. 17""" 18import io 19import os 20import sys 21import traceback 22from enum import Enum 23from typing import ( 24 List, 25 Optional, 26 Callable, 27 Union, 28 Any, 29 AnyStr, 30 IO, 31 TYPE_CHECKING, 32) 33 34from pyfakefs.fake_file import AnyFileWrapper 35from pyfakefs.fake_open import FakeFileOpen 36from pyfakefs.helpers import IS_PYPY 37 38if TYPE_CHECKING: 39 from pyfakefs.fake_filesystem import FakeFilesystem 40 41 42class PatchMode(Enum): 43 """Defines if patching shall be on, off, or in automatic mode. 44 Currently only used for `patch_open_code` option. 45 """ 46 47 OFF = 1 48 AUTO = 2 49 ON = 3 50 51 52class FakeIoModule: 53 """Uses FakeFilesystem to provide a fake io module replacement. 54 55 You need a fake_filesystem to use this: 56 filesystem = fake_filesystem.FakeFilesystem() 57 my_io_module = fake_io.FakeIoModule(filesystem) 58 """ 59 60 @staticmethod 61 def dir() -> List[str]: 62 """Return the list of patched function names. Used for patching 63 functions imported from the module. 64 """ 65 _dir = ["open"] 66 if sys.version_info >= (3, 8): 67 _dir.append("open_code") 68 return _dir 69 70 def __init__(self, filesystem: "FakeFilesystem"): 71 """ 72 Args: 73 filesystem: FakeFilesystem used to provide file system information. 74 """ 75 self.filesystem = filesystem 76 self.skip_names: List[str] = [] 77 self._io_module = io 78 79 def open( 80 self, 81 file: Union[AnyStr, int], 82 mode: str = "r", 83 buffering: int = -1, 84 encoding: Optional[str] = None, 85 errors: Optional[str] = None, 86 newline: Optional[str] = None, 87 closefd: bool = True, 88 opener: Optional[Callable] = None, 89 ) -> Union[AnyFileWrapper, IO[Any]]: 90 """Redirect the call to FakeFileOpen. 91 See FakeFileOpen.call() for description. 92 """ 93 # workaround for built-in open called from skipped modules (see #552) 94 # as open is not imported explicitly, we cannot patch it for 95 # specific modules; instead we check if the caller is a skipped 96 # module (should work in most cases) 97 stack = traceback.extract_stack(limit=2) 98 module_name = os.path.splitext(stack[0].filename)[0] 99 module_name = module_name.replace(os.sep, ".") 100 if any( 101 [ 102 module_name == sn or module_name.endswith("." + sn) 103 for sn in self.skip_names 104 ] 105 ): 106 return io.open( # pytype: disable=wrong-arg-count 107 file, 108 mode, 109 buffering, 110 encoding, 111 errors, 112 newline, 113 closefd, 114 opener, 115 ) 116 fake_open = FakeFileOpen(self.filesystem) 117 return fake_open( 118 file, mode, buffering, encoding, errors, newline, closefd, opener 119 ) 120 121 if sys.version_info >= (3, 8): 122 123 def open_code(self, path): 124 """Redirect the call to open. Note that the behavior of the real 125 function may be overridden by an earlier call to the 126 PyFile_SetOpenCodeHook(). This behavior is not reproduced here. 127 """ 128 if not isinstance(path, str) and not IS_PYPY: 129 raise TypeError("open_code() argument 'path' must be str, not int") 130 patch_mode = self.filesystem.patch_open_code 131 if ( 132 patch_mode == PatchMode.AUTO 133 and self.filesystem.exists(path) 134 or patch_mode == PatchMode.ON 135 ): 136 return self.open(path, mode="rb") 137 # mostly this is used for compiled code - 138 # don't patch these, as the files are probably in the real fs 139 return self._io_module.open_code(path) 140 141 def __getattr__(self, name): 142 """Forwards any unfaked calls to the standard io module.""" 143 return getattr(self._io_module, name) 144 145 146if sys.platform != "win32": 147 import fcntl 148 149 class FakeFcntlModule: 150 """Replaces the fcntl module. Only valid under Linux/MacOS, 151 currently just mocks the functionality away. 152 """ 153 154 @staticmethod 155 def dir() -> List[str]: 156 """Return the list of patched function names. Used for patching 157 functions imported from the module. 158 """ 159 return ["fcntl", "ioctl", "flock", "lockf"] 160 161 def __init__(self, filesystem: "FakeFilesystem"): 162 """ 163 Args: 164 filesystem: FakeFilesystem used to provide file system 165 information (currently not used). 166 """ 167 self.filesystem = filesystem 168 self._fcntl_module = fcntl 169 170 def fcntl(self, fd: int, cmd: int, arg: int = 0) -> Union[int, bytes]: 171 return 0 if isinstance(arg, int) else arg 172 173 def ioctl( 174 self, fd: int, request: int, arg: int = 0, mutate_flag: bool = True 175 ) -> Union[int, bytes]: 176 return 0 if isinstance(arg, int) else arg 177 178 def flock(self, fd: int, operation: int) -> None: 179 pass 180 181 def lockf( 182 self, fd: int, cmd: int, len: int = 0, start: int = 0, whence=0 183 ) -> Any: 184 pass 185 186 def __getattr__(self, name): 187 """Forwards any unfaked calls to the standard fcntl module.""" 188 return getattr(self._fcntl_module, name) 189