1# Copyright 2009 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15""" Uses :py:class:`FakeIoModule` to provide a
16    fake ``io`` module replacement.
17"""
18import io
19import os
20import sys
21import traceback
22from enum import Enum
23from typing import (
24    List,
25    Optional,
26    Callable,
27    Union,
28    Any,
29    AnyStr,
30    IO,
31    TYPE_CHECKING,
32)
33
34from pyfakefs.fake_file import AnyFileWrapper
35from pyfakefs.fake_open import FakeFileOpen
36from pyfakefs.helpers import IS_PYPY
37
38if TYPE_CHECKING:
39    from pyfakefs.fake_filesystem import FakeFilesystem
40
41
42class PatchMode(Enum):
43    """Defines if patching shall be on, off, or in automatic mode.
44    Currently only used for `patch_open_code` option.
45    """
46
47    OFF = 1
48    AUTO = 2
49    ON = 3
50
51
52class FakeIoModule:
53    """Uses FakeFilesystem to provide a fake io module replacement.
54
55    You need a fake_filesystem to use this:
56    filesystem = fake_filesystem.FakeFilesystem()
57    my_io_module = fake_io.FakeIoModule(filesystem)
58    """
59
60    @staticmethod
61    def dir() -> List[str]:
62        """Return the list of patched function names. Used for patching
63        functions imported from the module.
64        """
65        _dir = ["open"]
66        if sys.version_info >= (3, 8):
67            _dir.append("open_code")
68        return _dir
69
70    def __init__(self, filesystem: "FakeFilesystem"):
71        """
72        Args:
73            filesystem: FakeFilesystem used to provide file system information.
74        """
75        self.filesystem = filesystem
76        self.skip_names: List[str] = []
77        self._io_module = io
78
79    def open(
80        self,
81        file: Union[AnyStr, int],
82        mode: str = "r",
83        buffering: int = -1,
84        encoding: Optional[str] = None,
85        errors: Optional[str] = None,
86        newline: Optional[str] = None,
87        closefd: bool = True,
88        opener: Optional[Callable] = None,
89    ) -> Union[AnyFileWrapper, IO[Any]]:
90        """Redirect the call to FakeFileOpen.
91        See FakeFileOpen.call() for description.
92        """
93        # workaround for built-in open called from skipped modules (see #552)
94        # as open is not imported explicitly, we cannot patch it for
95        # specific modules; instead we check if the caller is a skipped
96        # module (should work in most cases)
97        stack = traceback.extract_stack(limit=2)
98        module_name = os.path.splitext(stack[0].filename)[0]
99        module_name = module_name.replace(os.sep, ".")
100        if any(
101            [
102                module_name == sn or module_name.endswith("." + sn)
103                for sn in self.skip_names
104            ]
105        ):
106            return io.open(  # pytype: disable=wrong-arg-count
107                file,
108                mode,
109                buffering,
110                encoding,
111                errors,
112                newline,
113                closefd,
114                opener,
115            )
116        fake_open = FakeFileOpen(self.filesystem)
117        return fake_open(
118            file, mode, buffering, encoding, errors, newline, closefd, opener
119        )
120
121    if sys.version_info >= (3, 8):
122
123        def open_code(self, path):
124            """Redirect the call to open. Note that the behavior of the real
125            function may be overridden by an earlier call to the
126            PyFile_SetOpenCodeHook(). This behavior is not reproduced here.
127            """
128            if not isinstance(path, str) and not IS_PYPY:
129                raise TypeError("open_code() argument 'path' must be str, not int")
130            patch_mode = self.filesystem.patch_open_code
131            if (
132                patch_mode == PatchMode.AUTO
133                and self.filesystem.exists(path)
134                or patch_mode == PatchMode.ON
135            ):
136                return self.open(path, mode="rb")
137            # mostly this is used for compiled code -
138            # don't patch these, as the files are probably in the real fs
139            return self._io_module.open_code(path)
140
141    def __getattr__(self, name):
142        """Forwards any unfaked calls to the standard io module."""
143        return getattr(self._io_module, name)
144
145
146if sys.platform != "win32":
147    import fcntl
148
149    class FakeFcntlModule:
150        """Replaces the fcntl module. Only valid under Linux/MacOS,
151        currently just mocks the functionality away.
152        """
153
154        @staticmethod
155        def dir() -> List[str]:
156            """Return the list of patched function names. Used for patching
157            functions imported from the module.
158            """
159            return ["fcntl", "ioctl", "flock", "lockf"]
160
161        def __init__(self, filesystem: "FakeFilesystem"):
162            """
163            Args:
164                filesystem: FakeFilesystem used to provide file system
165                    information (currently not used).
166            """
167            self.filesystem = filesystem
168            self._fcntl_module = fcntl
169
170        def fcntl(self, fd: int, cmd: int, arg: int = 0) -> Union[int, bytes]:
171            return 0 if isinstance(arg, int) else arg
172
173        def ioctl(
174            self, fd: int, request: int, arg: int = 0, mutate_flag: bool = True
175        ) -> Union[int, bytes]:
176            return 0 if isinstance(arg, int) else arg
177
178        def flock(self, fd: int, operation: int) -> None:
179            pass
180
181        def lockf(
182            self, fd: int, cmd: int, len: int = 0, start: int = 0, whence=0
183        ) -> Any:
184            pass
185
186        def __getattr__(self, name):
187            """Forwards any unfaked calls to the standard fcntl module."""
188            return getattr(self._fcntl_module, name)
189