1""":module: watchdog.observers.fsevents 2:synopsis: FSEvents based emitter implementation. 3:author: [email protected] (Yesudeep Mangalapilly) 4:author: [email protected] (Mickaël Schoentgen) 5:platforms: macOS 6""" 7 8from __future__ import annotations 9 10import logging 11import os 12import threading 13import time 14import unicodedata 15from typing import TYPE_CHECKING 16 17import _watchdog_fsevents as _fsevents 18 19from watchdog.events import ( 20 DirCreatedEvent, 21 DirDeletedEvent, 22 DirModifiedEvent, 23 DirMovedEvent, 24 FileCreatedEvent, 25 FileDeletedEvent, 26 FileModifiedEvent, 27 FileMovedEvent, 28 generate_sub_created_events, 29 generate_sub_moved_events, 30) 31from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter 32from watchdog.utils.dirsnapshot import DirectorySnapshot 33 34if TYPE_CHECKING: 35 from watchdog.events import FileSystemEvent, FileSystemEventHandler 36 from watchdog.observers.api import EventQueue, ObservedWatch 37 38 39logger = logging.getLogger("fsevents") 40 41 42class FSEventsEmitter(EventEmitter): 43 """macOS FSEvents Emitter class. 44 45 :param event_queue: 46 The event queue to fill with events. 47 :param watch: 48 A watch object representing the directory to monitor. 49 :type watch: 50 :class:`watchdog.observers.api.ObservedWatch` 51 :param timeout: 52 Read events blocking timeout (in seconds). 53 :param event_filter: 54 Collection of event types to emit, or None for no filtering (default). 55 :param suppress_history: 56 The FSEvents API may emit historic events up to 30 sec before the watch was 57 started. When ``suppress_history`` is ``True``, those events will be suppressed 58 by creating a directory snapshot of the watched path before starting the stream 59 as a reference to suppress old events. Warning: This may result in significant 60 memory usage in case of a large number of items in the watched path. 61 :type timeout: 62 ``float`` 63 """ 64 65 def __init__( 66 self, 67 event_queue: EventQueue, 68 watch: ObservedWatch, 69 *, 70 timeout: float = DEFAULT_EMITTER_TIMEOUT, 71 event_filter: list[type[FileSystemEvent]] | None = None, 72 suppress_history: bool = False, 73 ) -> None: 74 super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter) 75 self._fs_view: set[int] = set() 76 self.suppress_history = suppress_history 77 self._start_time = 0.0 78 self._starting_state: DirectorySnapshot | None = None 79 self._lock = threading.Lock() 80 self._absolute_watch_path = os.path.realpath(os.path.abspath(os.path.expanduser(self.watch.path))) 81 82 def on_thread_stop(self) -> None: 83 _fsevents.remove_watch(self.watch) 84 _fsevents.stop(self) 85 86 def queue_event(self, event: FileSystemEvent) -> None: 87 # fsevents defaults to be recursive, so if the watch was meant to be non-recursive then we need to drop 88 # all the events here which do not have a src_path / dest_path that matches the watched path 89 if self._watch.is_recursive or not self._is_recursive_event(event): 90 logger.debug("queue_event %s", event) 91 EventEmitter.queue_event(self, event) 92 else: 93 logger.debug("drop event %s", event) 94 95 def _is_recursive_event(self, event: FileSystemEvent) -> bool: 96 src_path = event.src_path if event.is_directory else os.path.dirname(event.src_path) 97 if src_path == self._absolute_watch_path: 98 return False 99 100 if isinstance(event, (FileMovedEvent, DirMovedEvent)): 101 # when moving something into the watch path we must always take the dirname, 102 # otherwise we miss out on `DirMovedEvent`s 103 dest_path = os.path.dirname(event.dest_path) 104 if dest_path == self._absolute_watch_path: 105 return False 106 107 return True 108 109 def _queue_created_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None: 110 cls = DirCreatedEvent if event.is_directory else FileCreatedEvent 111 self.queue_event(cls(src_path)) 112 self.queue_event(DirModifiedEvent(dirname)) 113 114 def _queue_deleted_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None: 115 cls = DirDeletedEvent if event.is_directory else FileDeletedEvent 116 self.queue_event(cls(src_path)) 117 self.queue_event(DirModifiedEvent(dirname)) 118 119 def _queue_modified_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None: 120 cls = DirModifiedEvent if event.is_directory else FileModifiedEvent 121 self.queue_event(cls(src_path)) 122 123 def _queue_renamed_event( 124 self, 125 src_event: FileSystemEvent, 126 src_path: bytes | str, 127 dst_path: bytes | str, 128 src_dirname: bytes | str, 129 dst_dirname: bytes | str, 130 ) -> None: 131 cls = DirMovedEvent if src_event.is_directory else FileMovedEvent 132 dst_path = self._encode_path(dst_path) 133 self.queue_event(cls(src_path, dst_path)) 134 self.queue_event(DirModifiedEvent(src_dirname)) 135 self.queue_event(DirModifiedEvent(dst_dirname)) 136 137 def _is_historic_created_event(self, event: _fsevents.NativeEvent) -> bool: 138 # We only queue a created event if the item was created after we 139 # started the FSEventsStream. 140 141 in_history = event.inode in self._fs_view 142 143 if self._starting_state: 144 try: 145 old_inode = self._starting_state.inode(event.path)[0] 146 before_start = old_inode == event.inode 147 except KeyError: 148 before_start = False 149 else: 150 before_start = False 151 152 return in_history or before_start 153 154 @staticmethod 155 def _is_meta_mod(event: _fsevents.NativeEvent) -> bool: 156 """Returns True if the event indicates a change in metadata.""" 157 return event.is_inode_meta_mod or event.is_xattr_mod or event.is_owner_change 158 159 def queue_events(self, timeout: float, events: list[_fsevents.NativeEvent]) -> None: # type: ignore[override] 160 if logger.getEffectiveLevel() <= logging.DEBUG: 161 for event in events: 162 flags = ", ".join(attr for attr in dir(event) if getattr(event, attr) is True) 163 logger.debug("%s: %s", event, flags) 164 165 if time.monotonic() - self._start_time > 60: 166 # Event history is no longer needed, let's free some memory. 167 self._starting_state = None 168 169 while events: 170 event = events.pop(0) 171 172 src_path = self._encode_path(event.path) 173 src_dirname = os.path.dirname(src_path) 174 175 try: 176 stat = os.stat(src_path) 177 except OSError: 178 stat = None 179 180 exists = stat and stat.st_ino == event.inode 181 182 # FSevents may coalesce multiple events for the same item + path into a 183 # single event. However, events are never coalesced for different items at 184 # the same path or for the same item at different paths. Therefore, the 185 # event chains "removed -> created" and "created -> renamed -> removed" will 186 # never emit a single native event and a deleted event *always* means that 187 # the item no longer existed at the end of the event chain. 188 189 # Some events will have a spurious `is_created` flag set, coalesced from an 190 # already emitted and processed CreatedEvent. To filter those, we keep track 191 # of all inodes which we know to be already created. This is safer than 192 # keeping track of paths since paths are more likely to be reused than 193 # inodes. 194 195 # Likewise, some events will have a spurious `is_modified`, 196 # `is_inode_meta_mod` or `is_xattr_mod` flag set. We currently do not 197 # suppress those but could do so if the item still exists by caching the 198 # stat result and verifying that it did change. 199 200 if event.is_created and event.is_removed: 201 # Events will only be coalesced for the same item / inode. 202 # The sequence deleted -> created therefore cannot occur. 203 # Any combination with renamed cannot occur either. 204 205 if not self._is_historic_created_event(event): 206 self._queue_created_event(event, src_path, src_dirname) 207 208 self._fs_view.add(event.inode) 209 210 if event.is_modified or self._is_meta_mod(event): 211 self._queue_modified_event(event, src_path, src_dirname) 212 213 self._queue_deleted_event(event, src_path, src_dirname) 214 self._fs_view.discard(event.inode) 215 216 else: 217 if event.is_created and not self._is_historic_created_event(event): 218 self._queue_created_event(event, src_path, src_dirname) 219 220 self._fs_view.add(event.inode) 221 222 if event.is_modified or self._is_meta_mod(event): 223 self._queue_modified_event(event, src_path, src_dirname) 224 225 if event.is_renamed: 226 # Check if we have a corresponding destination event in the watched path. 227 dst_event = next( 228 iter(e for e in events if e.is_renamed and e.inode == event.inode), 229 None, 230 ) 231 232 if dst_event: 233 # Item was moved within the watched folder. 234 logger.debug("Destination event for rename is %s", dst_event) 235 236 dst_path = self._encode_path(dst_event.path) 237 dst_dirname = os.path.dirname(dst_path) 238 239 self._queue_renamed_event(event, src_path, dst_path, src_dirname, dst_dirname) 240 self._fs_view.add(event.inode) 241 242 for sub_moved_event in generate_sub_moved_events(src_path, dst_path): 243 self.queue_event(sub_moved_event) 244 245 # Process any coalesced flags for the dst_event. 246 247 events.remove(dst_event) 248 249 if dst_event.is_modified or self._is_meta_mod(dst_event): 250 self._queue_modified_event(dst_event, dst_path, dst_dirname) 251 252 if dst_event.is_removed: 253 self._queue_deleted_event(dst_event, dst_path, dst_dirname) 254 self._fs_view.discard(dst_event.inode) 255 256 elif exists: 257 # This is the destination event, item was moved into the watched 258 # folder. 259 self._queue_created_event(event, src_path, src_dirname) 260 self._fs_view.add(event.inode) 261 262 for sub_created_event in generate_sub_created_events(src_path): 263 self.queue_event(sub_created_event) 264 265 else: 266 # This is the source event, item was moved out of the watched 267 # folder. 268 self._queue_deleted_event(event, src_path, src_dirname) 269 self._fs_view.discard(event.inode) 270 271 # Skip further coalesced processing. 272 continue 273 274 if event.is_removed: 275 # Won't occur together with renamed. 276 self._queue_deleted_event(event, src_path, src_dirname) 277 self._fs_view.discard(event.inode) 278 279 if event.is_root_changed: 280 # This will be set if root or any of its parents is renamed or deleted. 281 # TODO: find out new path and generate DirMovedEvent? 282 self.queue_event(DirDeletedEvent(self.watch.path)) 283 logger.debug("Stopping because root path was changed") 284 self.stop() 285 286 self._fs_view.clear() 287 288 def events_callback(self, paths: list[bytes], inodes: list[int], flags: list[int], ids: list[int]) -> None: 289 """Callback passed to FSEventStreamCreate(), it will receive all 290 FS events and queue them. 291 """ 292 cls = _fsevents.NativeEvent 293 try: 294 events = [ 295 cls(path, inode, event_flags, event_id) 296 for path, inode, event_flags, event_id in zip(paths, inodes, flags, ids) 297 ] 298 with self._lock: 299 self.queue_events(self.timeout, events) 300 except Exception: 301 logger.exception("Unhandled exception in fsevents callback") 302 303 def run(self) -> None: 304 self.pathnames = [self.watch.path] 305 self._start_time = time.monotonic() 306 try: 307 _fsevents.add_watch(self, self.watch, self.events_callback, self.pathnames) 308 _fsevents.read_events(self) 309 except Exception: 310 logger.exception("Unhandled exception in FSEventsEmitter") 311 312 def on_thread_start(self) -> None: 313 if self.suppress_history: 314 watch_path = os.fsdecode(self.watch.path) if isinstance(self.watch.path, bytes) else self.watch.path 315 self._starting_state = DirectorySnapshot(watch_path) 316 317 def _encode_path(self, path: bytes | str) -> bytes | str: 318 """Encode path only if bytes were passed to this emitter.""" 319 return os.fsencode(path) if isinstance(self.watch.path, bytes) else path 320 321 322class FSEventsObserver(BaseObserver): 323 def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None: 324 super().__init__(FSEventsEmitter, timeout=timeout) 325 326 def schedule( 327 self, 328 event_handler: FileSystemEventHandler, 329 path: str, 330 *, 331 recursive: bool = False, 332 event_filter: list[type[FileSystemEvent]] | None = None, 333 ) -> ObservedWatch: 334 # Fix for issue #26: Trace/BPT error when given a unicode path 335 # string. https://github.com/gorakhargosh/watchdog/issues#issue/26 336 if isinstance(path, str): 337 path = unicodedata.normalize("NFC", path) 338 339 return super().schedule(event_handler, path, recursive=recursive, event_filter=event_filter) 340