1""":module: watchdog.observers.fsevents
2:synopsis: FSEvents based emitter implementation.
3:author: [email protected] (Yesudeep Mangalapilly)
4:author: [email protected] (Mickaël Schoentgen)
5:platforms: macOS
6"""
7
8from __future__ import annotations
9
10import logging
11import os
12import threading
13import time
14import unicodedata
15from typing import TYPE_CHECKING
16
17import _watchdog_fsevents as _fsevents
18
19from watchdog.events import (
20    DirCreatedEvent,
21    DirDeletedEvent,
22    DirModifiedEvent,
23    DirMovedEvent,
24    FileCreatedEvent,
25    FileDeletedEvent,
26    FileModifiedEvent,
27    FileMovedEvent,
28    generate_sub_created_events,
29    generate_sub_moved_events,
30)
31from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter
32from watchdog.utils.dirsnapshot import DirectorySnapshot
33
34if TYPE_CHECKING:
35    from watchdog.events import FileSystemEvent, FileSystemEventHandler
36    from watchdog.observers.api import EventQueue, ObservedWatch
37
38
39logger = logging.getLogger("fsevents")
40
41
42class FSEventsEmitter(EventEmitter):
43    """macOS FSEvents Emitter class.
44
45    :param event_queue:
46        The event queue to fill with events.
47    :param watch:
48        A watch object representing the directory to monitor.
49    :type watch:
50        :class:`watchdog.observers.api.ObservedWatch`
51    :param timeout:
52        Read events blocking timeout (in seconds).
53    :param event_filter:
54        Collection of event types to emit, or None for no filtering (default).
55    :param suppress_history:
56        The FSEvents API may emit historic events up to 30 sec before the watch was
57        started. When ``suppress_history`` is ``True``, those events will be suppressed
58        by creating a directory snapshot of the watched path before starting the stream
59        as a reference to suppress old events. Warning: This may result in significant
60        memory usage in case of a large number of items in the watched path.
61    :type timeout:
62        ``float``
63    """
64
65    def __init__(
66        self,
67        event_queue: EventQueue,
68        watch: ObservedWatch,
69        *,
70        timeout: float = DEFAULT_EMITTER_TIMEOUT,
71        event_filter: list[type[FileSystemEvent]] | None = None,
72        suppress_history: bool = False,
73    ) -> None:
74        super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
75        self._fs_view: set[int] = set()
76        self.suppress_history = suppress_history
77        self._start_time = 0.0
78        self._starting_state: DirectorySnapshot | None = None
79        self._lock = threading.Lock()
80        self._absolute_watch_path = os.path.realpath(os.path.abspath(os.path.expanduser(self.watch.path)))
81
82    def on_thread_stop(self) -> None:
83        _fsevents.remove_watch(self.watch)
84        _fsevents.stop(self)
85
86    def queue_event(self, event: FileSystemEvent) -> None:
87        # fsevents defaults to be recursive, so if the watch was meant to be non-recursive then we need to drop
88        # all the events here which do not have a src_path / dest_path that matches the watched path
89        if self._watch.is_recursive or not self._is_recursive_event(event):
90            logger.debug("queue_event %s", event)
91            EventEmitter.queue_event(self, event)
92        else:
93            logger.debug("drop event %s", event)
94
95    def _is_recursive_event(self, event: FileSystemEvent) -> bool:
96        src_path = event.src_path if event.is_directory else os.path.dirname(event.src_path)
97        if src_path == self._absolute_watch_path:
98            return False
99
100        if isinstance(event, (FileMovedEvent, DirMovedEvent)):
101            # when moving something into the watch path we must always take the dirname,
102            # otherwise we miss out on `DirMovedEvent`s
103            dest_path = os.path.dirname(event.dest_path)
104            if dest_path == self._absolute_watch_path:
105                return False
106
107        return True
108
109    def _queue_created_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None:
110        cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
111        self.queue_event(cls(src_path))
112        self.queue_event(DirModifiedEvent(dirname))
113
114    def _queue_deleted_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None:
115        cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
116        self.queue_event(cls(src_path))
117        self.queue_event(DirModifiedEvent(dirname))
118
119    def _queue_modified_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None:
120        cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
121        self.queue_event(cls(src_path))
122
123    def _queue_renamed_event(
124        self,
125        src_event: FileSystemEvent,
126        src_path: bytes | str,
127        dst_path: bytes | str,
128        src_dirname: bytes | str,
129        dst_dirname: bytes | str,
130    ) -> None:
131        cls = DirMovedEvent if src_event.is_directory else FileMovedEvent
132        dst_path = self._encode_path(dst_path)
133        self.queue_event(cls(src_path, dst_path))
134        self.queue_event(DirModifiedEvent(src_dirname))
135        self.queue_event(DirModifiedEvent(dst_dirname))
136
137    def _is_historic_created_event(self, event: _fsevents.NativeEvent) -> bool:
138        # We only queue a created event if the item was created after we
139        # started the FSEventsStream.
140
141        in_history = event.inode in self._fs_view
142
143        if self._starting_state:
144            try:
145                old_inode = self._starting_state.inode(event.path)[0]
146                before_start = old_inode == event.inode
147            except KeyError:
148                before_start = False
149        else:
150            before_start = False
151
152        return in_history or before_start
153
154    @staticmethod
155    def _is_meta_mod(event: _fsevents.NativeEvent) -> bool:
156        """Returns True if the event indicates a change in metadata."""
157        return event.is_inode_meta_mod or event.is_xattr_mod or event.is_owner_change
158
159    def queue_events(self, timeout: float, events: list[_fsevents.NativeEvent]) -> None:  # type: ignore[override]
160        if logger.getEffectiveLevel() <= logging.DEBUG:
161            for event in events:
162                flags = ", ".join(attr for attr in dir(event) if getattr(event, attr) is True)
163                logger.debug("%s: %s", event, flags)
164
165        if time.monotonic() - self._start_time > 60:
166            # Event history is no longer needed, let's free some memory.
167            self._starting_state = None
168
169        while events:
170            event = events.pop(0)
171
172            src_path = self._encode_path(event.path)
173            src_dirname = os.path.dirname(src_path)
174
175            try:
176                stat = os.stat(src_path)
177            except OSError:
178                stat = None
179
180            exists = stat and stat.st_ino == event.inode
181
182            # FSevents may coalesce multiple events for the same item + path into a
183            # single event. However, events are never coalesced for different items at
184            # the same path or for the same item at different paths. Therefore, the
185            # event chains "removed -> created" and "created -> renamed -> removed" will
186            # never emit a single native event and a deleted event *always* means that
187            # the item no longer existed at the end of the event chain.
188
189            # Some events will have a spurious `is_created` flag set, coalesced from an
190            # already emitted and processed CreatedEvent. To filter those, we keep track
191            # of all inodes which we know to be already created. This is safer than
192            # keeping track of paths since paths are more likely to be reused than
193            # inodes.
194
195            # Likewise, some events will have a spurious `is_modified`,
196            # `is_inode_meta_mod` or `is_xattr_mod` flag set. We currently do not
197            # suppress those but could do so if the item still exists by caching the
198            # stat result and verifying that it did change.
199
200            if event.is_created and event.is_removed:
201                # Events will only be coalesced for the same item / inode.
202                # The sequence deleted -> created therefore cannot occur.
203                # Any combination with renamed cannot occur either.
204
205                if not self._is_historic_created_event(event):
206                    self._queue_created_event(event, src_path, src_dirname)
207
208                self._fs_view.add(event.inode)
209
210                if event.is_modified or self._is_meta_mod(event):
211                    self._queue_modified_event(event, src_path, src_dirname)
212
213                self._queue_deleted_event(event, src_path, src_dirname)
214                self._fs_view.discard(event.inode)
215
216            else:
217                if event.is_created and not self._is_historic_created_event(event):
218                    self._queue_created_event(event, src_path, src_dirname)
219
220                self._fs_view.add(event.inode)
221
222                if event.is_modified or self._is_meta_mod(event):
223                    self._queue_modified_event(event, src_path, src_dirname)
224
225                if event.is_renamed:
226                    # Check if we have a corresponding destination event in the watched path.
227                    dst_event = next(
228                        iter(e for e in events if e.is_renamed and e.inode == event.inode),
229                        None,
230                    )
231
232                    if dst_event:
233                        # Item was moved within the watched folder.
234                        logger.debug("Destination event for rename is %s", dst_event)
235
236                        dst_path = self._encode_path(dst_event.path)
237                        dst_dirname = os.path.dirname(dst_path)
238
239                        self._queue_renamed_event(event, src_path, dst_path, src_dirname, dst_dirname)
240                        self._fs_view.add(event.inode)
241
242                        for sub_moved_event in generate_sub_moved_events(src_path, dst_path):
243                            self.queue_event(sub_moved_event)
244
245                        # Process any coalesced flags for the dst_event.
246
247                        events.remove(dst_event)
248
249                        if dst_event.is_modified or self._is_meta_mod(dst_event):
250                            self._queue_modified_event(dst_event, dst_path, dst_dirname)
251
252                        if dst_event.is_removed:
253                            self._queue_deleted_event(dst_event, dst_path, dst_dirname)
254                            self._fs_view.discard(dst_event.inode)
255
256                    elif exists:
257                        # This is the destination event, item was moved into the watched
258                        # folder.
259                        self._queue_created_event(event, src_path, src_dirname)
260                        self._fs_view.add(event.inode)
261
262                        for sub_created_event in generate_sub_created_events(src_path):
263                            self.queue_event(sub_created_event)
264
265                    else:
266                        # This is the source event, item was moved out of the watched
267                        # folder.
268                        self._queue_deleted_event(event, src_path, src_dirname)
269                        self._fs_view.discard(event.inode)
270
271                        # Skip further coalesced processing.
272                        continue
273
274                if event.is_removed:
275                    # Won't occur together with renamed.
276                    self._queue_deleted_event(event, src_path, src_dirname)
277                    self._fs_view.discard(event.inode)
278
279            if event.is_root_changed:
280                # This will be set if root or any of its parents is renamed or deleted.
281                # TODO: find out new path and generate DirMovedEvent?
282                self.queue_event(DirDeletedEvent(self.watch.path))
283                logger.debug("Stopping because root path was changed")
284                self.stop()
285
286                self._fs_view.clear()
287
288    def events_callback(self, paths: list[bytes], inodes: list[int], flags: list[int], ids: list[int]) -> None:
289        """Callback passed to FSEventStreamCreate(), it will receive all
290        FS events and queue them.
291        """
292        cls = _fsevents.NativeEvent
293        try:
294            events = [
295                cls(path, inode, event_flags, event_id)
296                for path, inode, event_flags, event_id in zip(paths, inodes, flags, ids)
297            ]
298            with self._lock:
299                self.queue_events(self.timeout, events)
300        except Exception:
301            logger.exception("Unhandled exception in fsevents callback")
302
303    def run(self) -> None:
304        self.pathnames = [self.watch.path]
305        self._start_time = time.monotonic()
306        try:
307            _fsevents.add_watch(self, self.watch, self.events_callback, self.pathnames)
308            _fsevents.read_events(self)
309        except Exception:
310            logger.exception("Unhandled exception in FSEventsEmitter")
311
312    def on_thread_start(self) -> None:
313        if self.suppress_history:
314            watch_path = os.fsdecode(self.watch.path) if isinstance(self.watch.path, bytes) else self.watch.path
315            self._starting_state = DirectorySnapshot(watch_path)
316
317    def _encode_path(self, path: bytes | str) -> bytes | str:
318        """Encode path only if bytes were passed to this emitter."""
319        return os.fsencode(path) if isinstance(self.watch.path, bytes) else path
320
321
322class FSEventsObserver(BaseObserver):
323    def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
324        super().__init__(FSEventsEmitter, timeout=timeout)
325
326    def schedule(
327        self,
328        event_handler: FileSystemEventHandler,
329        path: str,
330        *,
331        recursive: bool = False,
332        event_filter: list[type[FileSystemEvent]] | None = None,
333    ) -> ObservedWatch:
334        # Fix for issue #26: Trace/BPT error when given a unicode path
335        # string. https://github.com/gorakhargosh/watchdog/issues#issue/26
336        if isinstance(path, str):
337            path = unicodedata.normalize("NFC", path)
338
339        return super().schedule(event_handler, path, recursive=recursive, event_filter=event_filter)
340