1""":module: watchdog.utils.dirsnapshot 2:synopsis: Directory snapshots and comparison. 3:author: [email protected] (Yesudeep Mangalapilly) 4:author: [email protected] (Mickaël Schoentgen) 5 6.. ADMONITION:: Where are the moved events? They "disappeared" 7 8 This implementation does not take partition boundaries 9 into consideration. It will only work when the directory 10 tree is entirely on the same file system. More specifically, 11 any part of the code that depends on inode numbers can 12 break if partition boundaries are crossed. In these cases, 13 the snapshot diff will represent file/directory movement as 14 created and deleted events. 15 16Classes 17------- 18.. autoclass:: DirectorySnapshot 19 :members: 20 :show-inheritance: 21 22.. autoclass:: DirectorySnapshotDiff 23 :members: 24 :show-inheritance: 25 26.. autoclass:: EmptyDirectorySnapshot 27 :members: 28 :show-inheritance: 29 30""" 31 32from __future__ import annotations 33 34import contextlib 35import errno 36import os 37from stat import S_ISDIR 38from typing import TYPE_CHECKING 39 40if TYPE_CHECKING: 41 from collections.abc import Iterator 42 from typing import Any, Callable 43 44 45class DirectorySnapshotDiff: 46 """Compares two directory snapshots and creates an object that represents 47 the difference between the two snapshots. 48 49 :param ref: 50 The reference directory snapshot. 51 :type ref: 52 :class:`DirectorySnapshot` 53 :param snapshot: 54 The directory snapshot which will be compared 55 with the reference snapshot. 56 :type snapshot: 57 :class:`DirectorySnapshot` 58 :param ignore_device: 59 A boolean indicating whether to ignore the device id or not. 60 By default, a file may be uniquely identified by a combination of its first 61 inode and its device id. The problem is that the device id may (or may not) 62 change between system boots. This problem would cause the DirectorySnapshotDiff 63 to think a file has been deleted and created again but it would be the 64 exact same file. 65 Set to True only if you are sure you will always use the same device. 66 :type ignore_device: 67 :class:`bool` 68 """ 69 70 def __init__( 71 self, 72 ref: DirectorySnapshot, 73 snapshot: DirectorySnapshot, 74 *, 75 ignore_device: bool = False, 76 ) -> None: 77 created = snapshot.paths - ref.paths 78 deleted = ref.paths - snapshot.paths 79 80 if ignore_device: 81 82 def get_inode(directory: DirectorySnapshot, full_path: bytes | str) -> int | tuple[int, int]: 83 return directory.inode(full_path)[0] 84 85 else: 86 87 def get_inode(directory: DirectorySnapshot, full_path: bytes | str) -> int | tuple[int, int]: 88 return directory.inode(full_path) 89 90 # check that all unchanged paths have the same inode 91 for path in ref.paths & snapshot.paths: 92 if get_inode(ref, path) != get_inode(snapshot, path): 93 created.add(path) 94 deleted.add(path) 95 96 # find moved paths 97 moved: set[tuple[bytes | str, bytes | str]] = set() 98 for path in set(deleted): 99 inode = ref.inode(path) 100 new_path = snapshot.path(inode) 101 if new_path: 102 # file is not deleted but moved 103 deleted.remove(path) 104 moved.add((path, new_path)) 105 106 for path in set(created): 107 inode = snapshot.inode(path) 108 old_path = ref.path(inode) 109 if old_path: 110 created.remove(path) 111 moved.add((old_path, path)) 112 113 # find modified paths 114 # first check paths that have not moved 115 modified: set[bytes | str] = set() 116 for path in ref.paths & snapshot.paths: 117 if get_inode(ref, path) == get_inode(snapshot, path) and ( 118 ref.mtime(path) != snapshot.mtime(path) or ref.size(path) != snapshot.size(path) 119 ): 120 modified.add(path) 121 122 for old_path, new_path in moved: 123 if ref.mtime(old_path) != snapshot.mtime(new_path) or ref.size(old_path) != snapshot.size(new_path): 124 modified.add(old_path) 125 126 self._dirs_created = [path for path in created if snapshot.isdir(path)] 127 self._dirs_deleted = [path for path in deleted if ref.isdir(path)] 128 self._dirs_modified = [path for path in modified if ref.isdir(path)] 129 self._dirs_moved = [(frm, to) for (frm, to) in moved if ref.isdir(frm)] 130 131 self._files_created = list(created - set(self._dirs_created)) 132 self._files_deleted = list(deleted - set(self._dirs_deleted)) 133 self._files_modified = list(modified - set(self._dirs_modified)) 134 self._files_moved = list(moved - set(self._dirs_moved)) 135 136 def __str__(self) -> str: 137 return self.__repr__() 138 139 def __repr__(self) -> str: 140 fmt = ( 141 "<{0} files(created={1}, deleted={2}, modified={3}, moved={4})," 142 " folders(created={5}, deleted={6}, modified={7}, moved={8})>" 143 ) 144 return fmt.format( 145 type(self).__name__, 146 len(self._files_created), 147 len(self._files_deleted), 148 len(self._files_modified), 149 len(self._files_moved), 150 len(self._dirs_created), 151 len(self._dirs_deleted), 152 len(self._dirs_modified), 153 len(self._dirs_moved), 154 ) 155 156 @property 157 def files_created(self) -> list[bytes | str]: 158 """List of files that were created.""" 159 return self._files_created 160 161 @property 162 def files_deleted(self) -> list[bytes | str]: 163 """List of files that were deleted.""" 164 return self._files_deleted 165 166 @property 167 def files_modified(self) -> list[bytes | str]: 168 """List of files that were modified.""" 169 return self._files_modified 170 171 @property 172 def files_moved(self) -> list[tuple[bytes | str, bytes | str]]: 173 """List of files that were moved. 174 175 Each event is a two-tuple the first item of which is the path 176 that has been renamed to the second item in the tuple. 177 """ 178 return self._files_moved 179 180 @property 181 def dirs_modified(self) -> list[bytes | str]: 182 """List of directories that were modified.""" 183 return self._dirs_modified 184 185 @property 186 def dirs_moved(self) -> list[tuple[bytes | str, bytes | str]]: 187 """List of directories that were moved. 188 189 Each event is a two-tuple the first item of which is the path 190 that has been renamed to the second item in the tuple. 191 """ 192 return self._dirs_moved 193 194 @property 195 def dirs_deleted(self) -> list[bytes | str]: 196 """List of directories that were deleted.""" 197 return self._dirs_deleted 198 199 @property 200 def dirs_created(self) -> list[bytes | str]: 201 """List of directories that were created.""" 202 return self._dirs_created 203 204 class ContextManager: 205 """Context manager that creates two directory snapshots and a 206 diff object that represents the difference between the two snapshots. 207 208 :param path: 209 The directory path for which a snapshot should be taken. 210 :type path: 211 ``str`` 212 :param recursive: 213 ``True`` if the entire directory tree should be included in the 214 snapshot; ``False`` otherwise. 215 :type recursive: 216 ``bool`` 217 :param stat: 218 Use custom stat function that returns a stat structure for path. 219 Currently only st_dev, st_ino, st_mode and st_mtime are needed. 220 221 A function taking a ``path`` as argument which will be called 222 for every entry in the directory tree. 223 :param listdir: 224 Use custom listdir function. For details see ``os.scandir``. 225 :param ignore_device: 226 A boolean indicating whether to ignore the device id or not. 227 By default, a file may be uniquely identified by a combination of its first 228 inode and its device id. The problem is that the device id may (or may not) 229 change between system boots. This problem would cause the DirectorySnapshotDiff 230 to think a file has been deleted and created again but it would be the 231 exact same file. 232 Set to True only if you are sure you will always use the same device. 233 :type ignore_device: 234 :class:`bool` 235 """ 236 237 def __init__( 238 self, 239 path: str, 240 *, 241 recursive: bool = True, 242 stat: Callable[[str], os.stat_result] = os.stat, 243 listdir: Callable[[str | None], Iterator[os.DirEntry]] = os.scandir, 244 ignore_device: bool = False, 245 ) -> None: 246 self.path = path 247 self.recursive = recursive 248 self.stat = stat 249 self.listdir = listdir 250 self.ignore_device = ignore_device 251 252 def __enter__(self) -> None: 253 self.pre_snapshot = self.get_snapshot() 254 255 def __exit__(self, *args: object) -> None: 256 self.post_snapshot = self.get_snapshot() 257 self.diff = DirectorySnapshotDiff( 258 self.pre_snapshot, 259 self.post_snapshot, 260 ignore_device=self.ignore_device, 261 ) 262 263 def get_snapshot(self) -> DirectorySnapshot: 264 return DirectorySnapshot( 265 path=self.path, 266 recursive=self.recursive, 267 stat=self.stat, 268 listdir=self.listdir, 269 ) 270 271 272class DirectorySnapshot: 273 """A snapshot of stat information of files in a directory. 274 275 :param path: 276 The directory path for which a snapshot should be taken. 277 :type path: 278 ``str`` 279 :param recursive: 280 ``True`` if the entire directory tree should be included in the 281 snapshot; ``False`` otherwise. 282 :type recursive: 283 ``bool`` 284 :param stat: 285 Use custom stat function that returns a stat structure for path. 286 Currently only st_dev, st_ino, st_mode and st_mtime are needed. 287 288 A function taking a ``path`` as argument which will be called 289 for every entry in the directory tree. 290 :param listdir: 291 Use custom listdir function. For details see ``os.scandir``. 292 """ 293 294 def __init__( 295 self, 296 path: str, 297 *, 298 recursive: bool = True, 299 stat: Callable[[str], os.stat_result] = os.stat, 300 listdir: Callable[[str | None], Iterator[os.DirEntry]] = os.scandir, 301 ) -> None: 302 self.recursive = recursive 303 self.stat = stat 304 self.listdir = listdir 305 306 self._stat_info: dict[bytes | str, os.stat_result] = {} 307 self._inode_to_path: dict[tuple[int, int], bytes | str] = {} 308 309 st = self.stat(path) 310 self._stat_info[path] = st 311 self._inode_to_path[(st.st_ino, st.st_dev)] = path 312 313 for p, st in self.walk(path): 314 i = (st.st_ino, st.st_dev) 315 self._inode_to_path[i] = p 316 self._stat_info[p] = st 317 318 def walk(self, root: str) -> Iterator[tuple[str, os.stat_result]]: 319 try: 320 paths = [os.path.join(root, entry.name) for entry in self.listdir(root)] 321 except OSError as e: 322 # Directory may have been deleted between finding it in the directory 323 # list of its parent and trying to delete its contents. If this 324 # happens we treat it as empty. Likewise if the directory was replaced 325 # with a file of the same name (less likely, but possible). 326 if e.errno in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL): 327 return 328 else: 329 raise 330 331 entries = [] 332 for p in paths: 333 with contextlib.suppress(OSError): 334 entry = (p, self.stat(p)) 335 entries.append(entry) 336 yield entry 337 338 if self.recursive: 339 for path, st in entries: 340 with contextlib.suppress(PermissionError): 341 if S_ISDIR(st.st_mode): 342 yield from self.walk(path) 343 344 @property 345 def paths(self) -> set[bytes | str]: 346 """Set of file/directory paths in the snapshot.""" 347 return set(self._stat_info.keys()) 348 349 def path(self, uid: tuple[int, int]) -> bytes | str | None: 350 """Returns path for id. None if id is unknown to this snapshot.""" 351 return self._inode_to_path.get(uid) 352 353 def inode(self, path: bytes | str) -> tuple[int, int]: 354 """Returns an id for path.""" 355 st = self._stat_info[path] 356 return (st.st_ino, st.st_dev) 357 358 def isdir(self, path: bytes | str) -> bool: 359 return S_ISDIR(self._stat_info[path].st_mode) 360 361 def mtime(self, path: bytes | str) -> float: 362 return self._stat_info[path].st_mtime 363 364 def size(self, path: bytes | str) -> int: 365 return self._stat_info[path].st_size 366 367 def stat_info(self, path: bytes | str) -> os.stat_result: 368 """Returns a stat information object for the specified path from 369 the snapshot. 370 371 Attached information is subject to change. Do not use unless 372 you specify `stat` in constructor. Use :func:`inode`, :func:`mtime`, 373 :func:`isdir` instead. 374 375 :param path: 376 The path for which stat information should be obtained 377 from a snapshot. 378 """ 379 return self._stat_info[path] 380 381 def __sub__(self, previous_dirsnap: DirectorySnapshot) -> DirectorySnapshotDiff: 382 """Allow subtracting a DirectorySnapshot object instance from 383 another. 384 385 :returns: 386 A :class:`DirectorySnapshotDiff` object. 387 """ 388 return DirectorySnapshotDiff(previous_dirsnap, self) 389 390 def __str__(self) -> str: 391 return self.__repr__() 392 393 def __repr__(self) -> str: 394 return str(self._stat_info) 395 396 397class EmptyDirectorySnapshot(DirectorySnapshot): 398 """Class to implement an empty snapshot. This is used together with 399 DirectorySnapshot and DirectorySnapshotDiff in order to get all the files/folders 400 in the directory as created. 401 """ 402 403 def __init__(self) -> None: 404 pass 405 406 @staticmethod 407 def path(_: Any) -> None: 408 """Mock up method to return the path of the received inode. As the snapshot 409 is intended to be empty, it always returns None. 410 411 :returns: 412 None. 413 """ 414 return 415 416 @property 417 def paths(self) -> set: 418 """Mock up method to return a set of file/directory paths in the snapshot. As 419 the snapshot is intended to be empty, it always returns an empty set. 420 421 :returns: 422 An empty set. 423 """ 424 return set() 425