1from __future__ import annotations
2
3import contextlib
4
5import pytest
6
7from watchdog.utils import platform
8
9if not platform.is_darwin():
10    pytest.skip("macOS only.", allow_module_level=True)
11
12import logging
13import os
14import time
15from os import mkdir, rmdir
16from random import random
17from threading import Thread
18from time import sleep
19from typing import TYPE_CHECKING
20from unittest.mock import patch
21
22import _watchdog_fsevents as _fsevents  # type: ignore[import-not-found]
23
24from watchdog.events import FileSystemEventHandler
25from watchdog.observers import Observer
26from watchdog.observers.api import BaseObserver, ObservedWatch
27from watchdog.observers.fsevents import FSEventsEmitter
28
29from .shell import touch
30
31if TYPE_CHECKING:
32    from .utils import P, StartWatching, TestEventQueue
33
34logging.basicConfig(level=logging.DEBUG)
35logger = logging.getLogger(__name__)
36
37
38@pytest.fixture
39def observer():
40    obs = Observer()
41    obs.start()
42    yield obs
43    obs.stop()
44    with contextlib.suppress(RuntimeError):
45        obs.join()
46
47
48@pytest.mark.parametrize(
49    ("event", "expectation"),
50    [
51        # invalid flags
52        (_fsevents.NativeEvent("", 0, 0, 0), False),
53        # renamed
54        (_fsevents.NativeEvent("", 0, 0x00000800, 0), False),
55        # renamed, removed
56        (_fsevents.NativeEvent("", 0, 0x00000800 | 0x00000200, 0), True),
57        # renamed, removed, created
58        (_fsevents.NativeEvent("", 0, 0x00000800 | 0x00000200 | 0x00000100, 0), True),
59        # renamed, removed, created, itemfindermod
60        (
61            _fsevents.NativeEvent("", 0, 0x00000800 | 0x00000200 | 0x00000100 | 0x00002000, 0),
62            True,
63        ),
64        # xattr, removed, modified, itemfindermod
65        (
66            _fsevents.NativeEvent("", 0, 0x00008000 | 0x00000200 | 0x00001000 | 0x00002000, 0),
67            False,
68        ),
69    ],
70)
71def test_coalesced_event_check(event, expectation):
72    assert event.is_coalesced == expectation
73
74
75def test_add_watch_twice(observer: BaseObserver, p: P) -> None:
76    """Adding the same watch twice used to result in a null pointer return without an exception.
77
78    See https://github.com/gorakhargosh/watchdog/issues/765
79    """
80
81    a = p("a")
82    mkdir(a)
83    h = FileSystemEventHandler()
84    w = ObservedWatch(a, recursive=False)
85
86    def callback(path, inodes, flags, ids):
87        pass
88
89    _fsevents.add_watch(h, w, callback, [w.path])
90    with pytest.raises(RuntimeError):
91        _fsevents.add_watch(h, w, callback, [w.path])
92    _fsevents.remove_watch(w)
93    rmdir(a)
94
95
96def test_watcher_deletion_while_receiving_events_1(
97    caplog: pytest.LogCaptureFixture,
98    p: P,
99    start_watching: StartWatching,
100) -> None:
101    """
102    When the watcher is stopped while there are events, such exception could happen:
103
104        Traceback (most recent call last):
105            File "observers/fsevents.py", line 327, in events_callback
106            self.queue_events(self.timeout, events)
107            File "observers/fsevents.py", line 187, in queue_events
108            src_path = self._encode_path(event.path)
109            File "observers/fsevents.py", line 352, in _encode_path
110            if isinstance(self.watch.path, bytes):
111        AttributeError: 'NoneType' object has no attribute 'path'
112    """
113    tmpdir = p()
114
115    orig = FSEventsEmitter.events_callback
116
117    def cb(*args):
118        FSEventsEmitter.stop(emitter)
119        orig(*args)
120
121    with caplog.at_level(logging.ERROR), patch.object(FSEventsEmitter, "events_callback", new=cb):
122        emitter = start_watching(path=tmpdir)
123        # Less than 100 is not enough events to trigger the error
124        for n in range(100):
125            touch(p(f"{n}.txt"))
126        emitter.stop()
127        assert not caplog.records
128
129
130def test_watcher_deletion_while_receiving_events_2(
131    caplog: pytest.LogCaptureFixture,
132    p: P,
133    start_watching: StartWatching,
134) -> None:
135    """Note: that test takes about 20 seconds to complete.
136
137    Quite similar test to prevent another issue
138    when the watcher is stopped while there are events, such exception could happen:
139
140        Traceback (most recent call last):
141            File "observers/fsevents.py", line 327, in events_callback
142              self.queue_events(self.timeout, events)
143            File "observers/fsevents.py", line 235, in queue_events
144              self._queue_created_event(event, src_path, src_dirname)
145            File "observers/fsevents.py", line 132, in _queue_created_event
146              self.queue_event(cls(src_path))
147            File "observers/fsevents.py", line 104, in queue_event
148              if self._watch.is_recursive:
149        AttributeError: 'NoneType' object has no attribute 'is_recursive'
150    """
151
152    def try_to_fail():
153        tmpdir = p()
154        emitter = start_watching(path=tmpdir)
155
156        def create_files():
157            # Less than 2000 is not enough events to trigger the error
158            for n in range(2000):
159                touch(p(f"{n}.txt"))
160
161        def stop(em):
162            sleep(random())
163            em.stop()
164
165        th1 = Thread(target=create_files)
166        th2 = Thread(target=stop, args=(emitter,))
167
168        try:
169            th1.start()
170            th2.start()
171            th1.join()
172            th2.join()
173        finally:
174            emitter.stop()
175
176    # 20 attempts to make the random failure happen
177    with caplog.at_level(logging.ERROR):
178        for _ in range(20):
179            try_to_fail()
180            sleep(random())
181
182        assert not caplog.records
183
184
185def test_remove_watch_twice(start_watching: StartWatching) -> None:
186    """
187    ValueError: PyCapsule_GetPointer called with invalid PyCapsule object
188    The above exception was the direct cause of the following exception:
189
190    src/watchdog/utils/__init__.py:92: in stop
191        self.on_thread_stop()
192
193    src/watchdog/observers/fsevents.py:73: SystemError
194        def on_thread_stop(self):
195    >       _fsevents.remove_watch(self.watch)
196    E       SystemError: <built-in function remove_watch> returned a result with an error set
197
198    (FSEvents.framework) FSEventStreamStop(): failed assertion 'streamRef != NULL'
199    (FSEvents.framework) FSEventStreamInvalidate(): failed assertion 'streamRef != NULL'
200    (FSEvents.framework) FSEventStreamRelease(): failed assertion 'streamRef != NULL'
201    """
202    emitter = start_watching()
203    # This one must work
204    emitter.stop()
205    # This is allowed to call several times .stop()
206    emitter.stop()
207
208
209def test_unschedule_removed_folder(observer: BaseObserver, p: P) -> None:
210    """
211    TypeError: PyCObject_AsVoidPtr called with null pointer
212    The above exception was the direct cause of the following exception:
213
214    def on_thread_stop(self):
215        if self.watch:
216            _fsevents.remove_watch(self.watch)
217    E       SystemError: <built-in function stop> returned a result with an error set
218
219    (FSEvents.framework) FSEventStreamStop(): failed assertion 'streamRef != NULL'
220    (FSEvents.framework) FSEventStreamInvalidate(): failed assertion 'streamRef != NULL'
221    (FSEvents.framework) FSEventStreamRelease(): failed assertion 'streamRef != NULL'
222    """
223    a = p("a")
224    mkdir(a)
225    w = observer.schedule(FileSystemEventHandler(), a, recursive=False)
226    rmdir(a)
227    time.sleep(0.1)
228    observer.unschedule(w)
229
230
231def test_converting_cfstring_to_pyunicode(p: P, start_watching: StartWatching, event_queue: TestEventQueue) -> None:
232    """See https://github.com/gorakhargosh/watchdog/issues/762"""
233
234    tmpdir = p()
235    emitter = start_watching(path=tmpdir)
236
237    dirname = "TéstClass"
238
239    try:
240        mkdir(p(dirname))
241        event, _ = event_queue.get()
242        assert event.src_path.endswith(dirname)
243    finally:
244        emitter.stop()
245
246
247def test_recursive_check_accepts_relative_paths(p: P) -> None:
248    """See https://github.com/gorakhargosh/watchdog/issues/797
249
250    The test code provided in the defect observes the current working directory
251    using ".". Since the watch path wasn't normalized then that failed.
252    This test emulates the scenario.
253    """
254    from watchdog.events import FileCreatedEvent, FileModifiedEvent, PatternMatchingEventHandler
255
256    class TestEventHandler(PatternMatchingEventHandler):
257        def __init__(self, *args, **kwargs):
258            super().__init__(*args, **kwargs)
259            # the TestEventHandler instance is set to ignore_directories,
260            # as such we won't get a DirModifiedEvent(p()) here.
261            self.expected_events = [
262                FileCreatedEvent(p("foo.json")),
263                FileModifiedEvent(p("foo.json")),
264            ]
265            self.observed_events = set()
266
267        def on_any_event(self, event):
268            self.expected_events.remove(event)
269            self.observed_events.add(event)
270
271        def done(self):
272            return not self.expected_events
273
274    cwd = os.getcwd()
275    os.chdir(p())
276    event_handler = TestEventHandler(patterns=["*.json"], ignore_patterns=[], ignore_directories=True)
277    observer = Observer()
278    observer.schedule(event_handler, ".")
279    observer.start()
280    time.sleep(0.1)
281
282    try:
283        touch(p("foo.json"))
284        timeout_at = time.time() + 5
285        while not event_handler.done() and time.time() < timeout_at:
286            time.sleep(0.1)
287
288        assert event_handler.done()
289    finally:
290        os.chdir(cwd)
291        observer.stop()
292        observer.join()
293
294
295def test_watchdog_recursive(p: P) -> None:
296    """See https://github.com/gorakhargosh/watchdog/issues/706"""
297    import os.path
298
299    from watchdog.events import FileSystemEventHandler
300    from watchdog.observers import Observer
301
302    class Handler(FileSystemEventHandler):
303        def __init__(self):
304            super().__init__()
305            self.changes = []
306
307        def on_any_event(self, event):
308            self.changes.append(os.path.basename(event.src_path))
309
310    handler = Handler()
311    observer = Observer()
312
313    watches = [observer.schedule(handler, str(p("")), recursive=True)]
314    try:
315        observer.start()
316        time.sleep(0.1)
317
318        touch(p("my0.txt"))
319        mkdir(p("dir_rec"))
320        touch(p("dir_rec", "my1.txt"))
321
322        expected = {"dir_rec", "my0.txt", "my1.txt"}
323        timeout_at = time.time() + 5
324        while not expected.issubset(handler.changes) and time.time() < timeout_at:
325            time.sleep(0.2)
326
327        assert expected.issubset(handler.changes), f"Did not find expected changes. Found: {handler.changes}"
328    finally:
329        for watch in watches:
330            observer.unschedule(watch)
331        observer.stop()
332        observer.join(1)
333