1""":module: watchdog.utils.dirsnapshot
2:synopsis: Directory snapshots and comparison.
3:author: [email protected] (Yesudeep Mangalapilly)
4:author: [email protected] (Mickaël Schoentgen)
5
6.. ADMONITION:: Where are the moved events? They "disappeared"
7
8        This implementation does not take partition boundaries
9        into consideration. It will only work when the directory
10        tree is entirely on the same file system. More specifically,
11        any part of the code that depends on inode numbers can
12        break if partition boundaries are crossed. In these cases,
13        the snapshot diff will represent file/directory movement as
14        created and deleted events.
15
16Classes
17-------
18.. autoclass:: DirectorySnapshot
19   :members:
20   :show-inheritance:
21
22.. autoclass:: DirectorySnapshotDiff
23   :members:
24   :show-inheritance:
25
26.. autoclass:: EmptyDirectorySnapshot
27   :members:
28   :show-inheritance:
29
30"""
31
32from __future__ import annotations
33
34import contextlib
35import errno
36import os
37from stat import S_ISDIR
38from typing import TYPE_CHECKING
39
40if TYPE_CHECKING:
41    from collections.abc import Iterator
42    from typing import Any, Callable
43
44
45class DirectorySnapshotDiff:
46    """Compares two directory snapshots and creates an object that represents
47    the difference between the two snapshots.
48
49    :param ref:
50        The reference directory snapshot.
51    :type ref:
52        :class:`DirectorySnapshot`
53    :param snapshot:
54        The directory snapshot which will be compared
55        with the reference snapshot.
56    :type snapshot:
57        :class:`DirectorySnapshot`
58    :param ignore_device:
59        A boolean indicating whether to ignore the device id or not.
60        By default, a file may be uniquely identified by a combination of its first
61        inode and its device id. The problem is that the device id may (or may not)
62        change between system boots. This problem would cause the DirectorySnapshotDiff
63        to think a file has been deleted and created again but it would be the
64        exact same file.
65        Set to True only if you are sure you will always use the same device.
66    :type ignore_device:
67        :class:`bool`
68    """
69
70    def __init__(
71        self,
72        ref: DirectorySnapshot,
73        snapshot: DirectorySnapshot,
74        *,
75        ignore_device: bool = False,
76    ) -> None:
77        created = snapshot.paths - ref.paths
78        deleted = ref.paths - snapshot.paths
79
80        if ignore_device:
81
82            def get_inode(directory: DirectorySnapshot, full_path: bytes | str) -> int | tuple[int, int]:
83                return directory.inode(full_path)[0]
84
85        else:
86
87            def get_inode(directory: DirectorySnapshot, full_path: bytes | str) -> int | tuple[int, int]:
88                return directory.inode(full_path)
89
90        # check that all unchanged paths have the same inode
91        for path in ref.paths & snapshot.paths:
92            if get_inode(ref, path) != get_inode(snapshot, path):
93                created.add(path)
94                deleted.add(path)
95
96        # find moved paths
97        moved: set[tuple[bytes | str, bytes | str]] = set()
98        for path in set(deleted):
99            inode = ref.inode(path)
100            new_path = snapshot.path(inode)
101            if new_path:
102                # file is not deleted but moved
103                deleted.remove(path)
104                moved.add((path, new_path))
105
106        for path in set(created):
107            inode = snapshot.inode(path)
108            old_path = ref.path(inode)
109            if old_path:
110                created.remove(path)
111                moved.add((old_path, path))
112
113        # find modified paths
114        # first check paths that have not moved
115        modified: set[bytes | str] = set()
116        for path in ref.paths & snapshot.paths:
117            if get_inode(ref, path) == get_inode(snapshot, path) and (
118                ref.mtime(path) != snapshot.mtime(path) or ref.size(path) != snapshot.size(path)
119            ):
120                modified.add(path)
121
122        for old_path, new_path in moved:
123            if ref.mtime(old_path) != snapshot.mtime(new_path) or ref.size(old_path) != snapshot.size(new_path):
124                modified.add(old_path)
125
126        self._dirs_created = [path for path in created if snapshot.isdir(path)]
127        self._dirs_deleted = [path for path in deleted if ref.isdir(path)]
128        self._dirs_modified = [path for path in modified if ref.isdir(path)]
129        self._dirs_moved = [(frm, to) for (frm, to) in moved if ref.isdir(frm)]
130
131        self._files_created = list(created - set(self._dirs_created))
132        self._files_deleted = list(deleted - set(self._dirs_deleted))
133        self._files_modified = list(modified - set(self._dirs_modified))
134        self._files_moved = list(moved - set(self._dirs_moved))
135
136    def __str__(self) -> str:
137        return self.__repr__()
138
139    def __repr__(self) -> str:
140        fmt = (
141            "<{0} files(created={1}, deleted={2}, modified={3}, moved={4}),"
142            " folders(created={5}, deleted={6}, modified={7}, moved={8})>"
143        )
144        return fmt.format(
145            type(self).__name__,
146            len(self._files_created),
147            len(self._files_deleted),
148            len(self._files_modified),
149            len(self._files_moved),
150            len(self._dirs_created),
151            len(self._dirs_deleted),
152            len(self._dirs_modified),
153            len(self._dirs_moved),
154        )
155
156    @property
157    def files_created(self) -> list[bytes | str]:
158        """List of files that were created."""
159        return self._files_created
160
161    @property
162    def files_deleted(self) -> list[bytes | str]:
163        """List of files that were deleted."""
164        return self._files_deleted
165
166    @property
167    def files_modified(self) -> list[bytes | str]:
168        """List of files that were modified."""
169        return self._files_modified
170
171    @property
172    def files_moved(self) -> list[tuple[bytes | str, bytes | str]]:
173        """List of files that were moved.
174
175        Each event is a two-tuple the first item of which is the path
176        that has been renamed to the second item in the tuple.
177        """
178        return self._files_moved
179
180    @property
181    def dirs_modified(self) -> list[bytes | str]:
182        """List of directories that were modified."""
183        return self._dirs_modified
184
185    @property
186    def dirs_moved(self) -> list[tuple[bytes | str, bytes | str]]:
187        """List of directories that were moved.
188
189        Each event is a two-tuple the first item of which is the path
190        that has been renamed to the second item in the tuple.
191        """
192        return self._dirs_moved
193
194    @property
195    def dirs_deleted(self) -> list[bytes | str]:
196        """List of directories that were deleted."""
197        return self._dirs_deleted
198
199    @property
200    def dirs_created(self) -> list[bytes | str]:
201        """List of directories that were created."""
202        return self._dirs_created
203
204    class ContextManager:
205        """Context manager that creates two directory snapshots and a
206        diff object that represents the difference between the two snapshots.
207
208        :param path:
209            The directory path for which a snapshot should be taken.
210        :type path:
211            ``str``
212        :param recursive:
213            ``True`` if the entire directory tree should be included in the
214            snapshot; ``False`` otherwise.
215        :type recursive:
216            ``bool``
217        :param stat:
218            Use custom stat function that returns a stat structure for path.
219            Currently only st_dev, st_ino, st_mode and st_mtime are needed.
220
221            A function taking a ``path`` as argument which will be called
222            for every entry in the directory tree.
223        :param listdir:
224            Use custom listdir function. For details see ``os.scandir``.
225        :param ignore_device:
226            A boolean indicating whether to ignore the device id or not.
227            By default, a file may be uniquely identified by a combination of its first
228            inode and its device id. The problem is that the device id may (or may not)
229            change between system boots. This problem would cause the DirectorySnapshotDiff
230            to think a file has been deleted and created again but it would be the
231            exact same file.
232            Set to True only if you are sure you will always use the same device.
233        :type ignore_device:
234            :class:`bool`
235        """
236
237        def __init__(
238            self,
239            path: str,
240            *,
241            recursive: bool = True,
242            stat: Callable[[str], os.stat_result] = os.stat,
243            listdir: Callable[[str | None], Iterator[os.DirEntry]] = os.scandir,
244            ignore_device: bool = False,
245        ) -> None:
246            self.path = path
247            self.recursive = recursive
248            self.stat = stat
249            self.listdir = listdir
250            self.ignore_device = ignore_device
251
252        def __enter__(self) -> None:
253            self.pre_snapshot = self.get_snapshot()
254
255        def __exit__(self, *args: object) -> None:
256            self.post_snapshot = self.get_snapshot()
257            self.diff = DirectorySnapshotDiff(
258                self.pre_snapshot,
259                self.post_snapshot,
260                ignore_device=self.ignore_device,
261            )
262
263        def get_snapshot(self) -> DirectorySnapshot:
264            return DirectorySnapshot(
265                path=self.path,
266                recursive=self.recursive,
267                stat=self.stat,
268                listdir=self.listdir,
269            )
270
271
272class DirectorySnapshot:
273    """A snapshot of stat information of files in a directory.
274
275    :param path:
276        The directory path for which a snapshot should be taken.
277    :type path:
278        ``str``
279    :param recursive:
280        ``True`` if the entire directory tree should be included in the
281        snapshot; ``False`` otherwise.
282    :type recursive:
283        ``bool``
284    :param stat:
285        Use custom stat function that returns a stat structure for path.
286        Currently only st_dev, st_ino, st_mode and st_mtime are needed.
287
288        A function taking a ``path`` as argument which will be called
289        for every entry in the directory tree.
290    :param listdir:
291        Use custom listdir function. For details see ``os.scandir``.
292    """
293
294    def __init__(
295        self,
296        path: str,
297        *,
298        recursive: bool = True,
299        stat: Callable[[str], os.stat_result] = os.stat,
300        listdir: Callable[[str | None], Iterator[os.DirEntry]] = os.scandir,
301    ) -> None:
302        self.recursive = recursive
303        self.stat = stat
304        self.listdir = listdir
305
306        self._stat_info: dict[bytes | str, os.stat_result] = {}
307        self._inode_to_path: dict[tuple[int, int], bytes | str] = {}
308
309        st = self.stat(path)
310        self._stat_info[path] = st
311        self._inode_to_path[(st.st_ino, st.st_dev)] = path
312
313        for p, st in self.walk(path):
314            i = (st.st_ino, st.st_dev)
315            self._inode_to_path[i] = p
316            self._stat_info[p] = st
317
318    def walk(self, root: str) -> Iterator[tuple[str, os.stat_result]]:
319        try:
320            paths = [os.path.join(root, entry.name) for entry in self.listdir(root)]
321        except OSError as e:
322            # Directory may have been deleted between finding it in the directory
323            # list of its parent and trying to delete its contents. If this
324            # happens we treat it as empty. Likewise if the directory was replaced
325            # with a file of the same name (less likely, but possible).
326            if e.errno in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL):
327                return
328            else:
329                raise
330
331        entries = []
332        for p in paths:
333            with contextlib.suppress(OSError):
334                entry = (p, self.stat(p))
335                entries.append(entry)
336                yield entry
337
338        if self.recursive:
339            for path, st in entries:
340                with contextlib.suppress(PermissionError):
341                    if S_ISDIR(st.st_mode):
342                        yield from self.walk(path)
343
344    @property
345    def paths(self) -> set[bytes | str]:
346        """Set of file/directory paths in the snapshot."""
347        return set(self._stat_info.keys())
348
349    def path(self, uid: tuple[int, int]) -> bytes | str | None:
350        """Returns path for id. None if id is unknown to this snapshot."""
351        return self._inode_to_path.get(uid)
352
353    def inode(self, path: bytes | str) -> tuple[int, int]:
354        """Returns an id for path."""
355        st = self._stat_info[path]
356        return (st.st_ino, st.st_dev)
357
358    def isdir(self, path: bytes | str) -> bool:
359        return S_ISDIR(self._stat_info[path].st_mode)
360
361    def mtime(self, path: bytes | str) -> float:
362        return self._stat_info[path].st_mtime
363
364    def size(self, path: bytes | str) -> int:
365        return self._stat_info[path].st_size
366
367    def stat_info(self, path: bytes | str) -> os.stat_result:
368        """Returns a stat information object for the specified path from
369        the snapshot.
370
371        Attached information is subject to change. Do not use unless
372        you specify `stat` in constructor. Use :func:`inode`, :func:`mtime`,
373        :func:`isdir` instead.
374
375        :param path:
376            The path for which stat information should be obtained
377            from a snapshot.
378        """
379        return self._stat_info[path]
380
381    def __sub__(self, previous_dirsnap: DirectorySnapshot) -> DirectorySnapshotDiff:
382        """Allow subtracting a DirectorySnapshot object instance from
383        another.
384
385        :returns:
386            A :class:`DirectorySnapshotDiff` object.
387        """
388        return DirectorySnapshotDiff(previous_dirsnap, self)
389
390    def __str__(self) -> str:
391        return self.__repr__()
392
393    def __repr__(self) -> str:
394        return str(self._stat_info)
395
396
397class EmptyDirectorySnapshot(DirectorySnapshot):
398    """Class to implement an empty snapshot. This is used together with
399    DirectorySnapshot and DirectorySnapshotDiff in order to get all the files/folders
400    in the directory as created.
401    """
402
403    def __init__(self) -> None:
404        pass
405
406    @staticmethod
407    def path(_: Any) -> None:
408        """Mock up method to return the path of the received inode. As the snapshot
409        is intended to be empty, it always returns None.
410
411        :returns:
412            None.
413        """
414        return
415
416    @property
417    def paths(self) -> set:
418        """Mock up method to return a set of file/directory paths in the snapshot. As
419        the snapshot is intended to be empty, it always returns an empty set.
420
421        :returns:
422            An empty set.
423        """
424        return set()
425