1""":module: watchdog.observers.polling 2:synopsis: Polling emitter implementation. 3:author: [email protected] (Yesudeep Mangalapilly) 4:author: [email protected] (Mickaël Schoentgen) 5 6Classes 7------- 8.. autoclass:: PollingObserver 9 :members: 10 :show-inheritance: 11 12.. autoclass:: PollingObserverVFS 13 :members: 14 :show-inheritance: 15 :special-members: 16""" 17 18from __future__ import annotations 19 20import os 21import threading 22from functools import partial 23from typing import TYPE_CHECKING 24 25from watchdog.events import ( 26 DirCreatedEvent, 27 DirDeletedEvent, 28 DirModifiedEvent, 29 DirMovedEvent, 30 FileCreatedEvent, 31 FileDeletedEvent, 32 FileModifiedEvent, 33 FileMovedEvent, 34) 35from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter 36from watchdog.utils.dirsnapshot import DirectorySnapshot, DirectorySnapshotDiff, EmptyDirectorySnapshot 37 38if TYPE_CHECKING: 39 from collections.abc import Iterator 40 from typing import Callable 41 42 from watchdog.events import FileSystemEvent 43 from watchdog.observers.api import EventQueue, ObservedWatch 44 45 46class PollingEmitter(EventEmitter): 47 """Platform-independent emitter that polls a directory to detect file 48 system changes. 49 """ 50 51 def __init__( 52 self, 53 event_queue: EventQueue, 54 watch: ObservedWatch, 55 *, 56 timeout: float = DEFAULT_EMITTER_TIMEOUT, 57 event_filter: list[type[FileSystemEvent]] | None = None, 58 stat: Callable[[str], os.stat_result] = os.stat, 59 listdir: Callable[[str | None], Iterator[os.DirEntry]] = os.scandir, 60 ) -> None: 61 super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter) 62 self._snapshot: DirectorySnapshot = EmptyDirectorySnapshot() 63 self._lock = threading.Lock() 64 self._take_snapshot: Callable[[], DirectorySnapshot] = lambda: DirectorySnapshot( 65 self.watch.path, 66 recursive=self.watch.is_recursive, 67 stat=stat, 68 listdir=listdir, 69 ) 70 71 def on_thread_start(self) -> None: 72 self._snapshot = self._take_snapshot() 73 74 def queue_events(self, timeout: float) -> None: 75 # We don't want to hit the disk continuously. 76 # timeout behaves like an interval for polling emitters. 77 if self.stopped_event.wait(timeout): 78 return 79 80 with self._lock: 81 if not self.should_keep_running(): 82 return 83 84 # Get event diff between fresh snapshot and previous snapshot. 85 # Update snapshot. 86 try: 87 new_snapshot = self._take_snapshot() 88 except OSError: 89 self.queue_event(DirDeletedEvent(self.watch.path)) 90 self.stop() 91 return 92 93 events = DirectorySnapshotDiff(self._snapshot, new_snapshot) 94 self._snapshot = new_snapshot 95 96 # Files. 97 for src_path in events.files_deleted: 98 self.queue_event(FileDeletedEvent(src_path)) 99 for src_path in events.files_modified: 100 self.queue_event(FileModifiedEvent(src_path)) 101 for src_path in events.files_created: 102 self.queue_event(FileCreatedEvent(src_path)) 103 for src_path, dest_path in events.files_moved: 104 self.queue_event(FileMovedEvent(src_path, dest_path)) 105 106 # Directories. 107 for src_path in events.dirs_deleted: 108 self.queue_event(DirDeletedEvent(src_path)) 109 for src_path in events.dirs_modified: 110 self.queue_event(DirModifiedEvent(src_path)) 111 for src_path in events.dirs_created: 112 self.queue_event(DirCreatedEvent(src_path)) 113 for src_path, dest_path in events.dirs_moved: 114 self.queue_event(DirMovedEvent(src_path, dest_path)) 115 116 117class PollingObserver(BaseObserver): 118 """Platform-independent observer that polls a directory to detect file 119 system changes. 120 """ 121 122 def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None: 123 super().__init__(PollingEmitter, timeout=timeout) 124 125 126class PollingObserverVFS(BaseObserver): 127 """File system independent observer that polls a directory to detect changes.""" 128 129 def __init__( 130 self, 131 stat: Callable[[str], os.stat_result], 132 listdir: Callable[[str | None], Iterator[os.DirEntry]], 133 *, 134 polling_interval: int = 1, 135 ) -> None: 136 """:param stat: stat function. See ``os.stat`` for details. 137 :param listdir: listdir function. See ``os.scandir`` for details. 138 :type polling_interval: int 139 :param polling_interval: interval in seconds between polling the file system. 140 """ 141 emitter_cls = partial(PollingEmitter, stat=stat, listdir=listdir) 142 super().__init__(emitter_cls, timeout=polling_interval) # type: ignore[arg-type] 143