1from __future__ import annotations 2 3import contextlib 4import threading 5from typing import TYPE_CHECKING 6from unittest.mock import patch 7 8import pytest 9 10from watchdog.events import FileModifiedEvent, FileSystemEventHandler 11from watchdog.observers.api import BaseObserver, EventEmitter 12 13if TYPE_CHECKING: 14 from collections.abc import Iterator 15 16 17@pytest.fixture 18def observer() -> Iterator[BaseObserver]: 19 obs = BaseObserver(EventEmitter) 20 yield obs 21 obs.stop() 22 with contextlib.suppress(RuntimeError): 23 obs.join() 24 25 26@pytest.fixture 27def observer2(): 28 obs = BaseObserver(EventEmitter) 29 yield obs 30 obs.stop() 31 with contextlib.suppress(RuntimeError): 32 obs.join() 33 34 35def test_schedule_should_start_emitter_if_running(observer): 36 observer.start() 37 observer.schedule(None, "") 38 (emitter,) = observer.emitters 39 assert emitter.is_alive() 40 41 42def test_schedule_should_not_start_emitter_if_not_running(observer): 43 observer.schedule(None, "") 44 (emitter,) = observer.emitters 45 assert not emitter.is_alive() 46 47 48def test_start_should_start_emitter(observer): 49 observer.schedule(None, "") 50 observer.start() 51 (emitter,) = observer.emitters 52 assert emitter.is_alive() 53 54 55def test_stop_should_stop_emitter(observer): 56 observer.schedule(None, "") 57 observer.start() 58 (emitter,) = observer.emitters 59 assert emitter.is_alive() 60 observer.stop() 61 observer.join() 62 assert not observer.is_alive() 63 assert not emitter.is_alive() 64 65 66def test_unschedule_self(observer): 67 """ 68 Tests that unscheduling a watch from within an event handler correctly 69 correctly unregisters emitter and handler without deadlocking. 70 """ 71 72 class EventHandler(FileSystemEventHandler): 73 def on_modified(self, event): 74 observer.unschedule(watch) 75 unschedule_finished.set() 76 77 unschedule_finished = threading.Event() 78 watch = observer.schedule(EventHandler(), "") 79 observer.start() 80 81 (emitter,) = observer.emitters 82 emitter.queue_event(FileModifiedEvent("")) 83 84 assert unschedule_finished.wait() 85 assert len(observer.emitters) == 0 86 87 88def test_schedule_after_unschedule_all(observer): 89 observer.start() 90 observer.schedule(None, "") 91 assert len(observer.emitters) == 1 92 93 observer.unschedule_all() 94 assert len(observer.emitters) == 0 95 96 observer.schedule(None, "") 97 assert len(observer.emitters) == 1 98 99 100def test_2_observers_on_the_same_path(observer, observer2): 101 assert observer is not observer2 102 103 observer.schedule(None, "") 104 assert len(observer.emitters) == 1 105 106 observer2.schedule(None, "") 107 assert len(observer2.emitters) == 1 108 109 110def test_start_failure_should_not_prevent_further_try(observer): 111 observer.schedule(None, "") 112 emitters = observer.emitters 113 assert len(emitters) == 1 114 115 # Make the emitter to fail on start() 116 117 def mocked_start(): 118 raise OSError("Mock'ed!") 119 120 emitter = next(iter(emitters)) 121 with patch.object(emitter, "start", new=mocked_start), pytest.raises(OSError, match="Mock'ed!"): 122 observer.start() 123 # The emitter should be removed from the list 124 assert len(observer.emitters) == 0 125 126 # Restoring the original behavior should work like there never be emitters 127 observer.start() 128 assert len(observer.emitters) == 0 129 130 # Re-scheduling the watch should work 131 observer.schedule(None, "") 132 assert len(observer.emitters) == 1 133 134 135def test_schedule_failure_should_not_prevent_future_schedules(observer): 136 observer.start() 137 138 # Make the emitter fail on start(), and subsequently the observer to fail on schedule() 139 def bad_start(_): 140 raise OSError("Mock'ed!") 141 142 with patch.object(EventEmitter, "start", new=bad_start), pytest.raises(OSError, match="Mock'ed!"): 143 observer.schedule(None, "") 144 # The emitter should not be in the list 145 assert not observer.emitters 146 147 # Re-scheduling the watch should work 148 observer.schedule(None, "") 149 assert len(observer.emitters) == 1 150