1from __future__ import annotations
2
3import contextlib
4import queue
5import threading
6from collections import defaultdict
7from pathlib import Path
8from typing import TYPE_CHECKING
9
10from watchdog.utils import BaseThread
11from watchdog.utils.bricks import SkipRepeatsQueue
12
13if TYPE_CHECKING:
14    from watchdog.events import FileSystemEvent, FileSystemEventHandler
15
16DEFAULT_EMITTER_TIMEOUT = 1.0  # in seconds
17DEFAULT_OBSERVER_TIMEOUT = 1.0  # in seconds
18
19
20class EventQueue(SkipRepeatsQueue):
21    """Thread-safe event queue based on a special queue that skips adding
22    the same event (:class:`FileSystemEvent`) multiple times consecutively.
23    Thus avoiding dispatching multiple event handling
24    calls when multiple identical events are produced quicker than an observer
25    can consume them.
26    """
27
28
29class ObservedWatch:
30    """An scheduled watch.
31
32    :param path:
33        Path string.
34    :param recursive:
35        ``True`` if watch is recursive; ``False`` otherwise.
36    :param event_filter:
37        Optional collection of :class:`watchdog.events.FileSystemEvent` to watch
38    """
39
40    def __init__(self, path: str | Path, *, recursive: bool, event_filter: list[type[FileSystemEvent]] | None = None):
41        self._path = str(path) if isinstance(path, Path) else path
42        self._is_recursive = recursive
43        self._event_filter = frozenset(event_filter) if event_filter is not None else None
44
45    @property
46    def path(self) -> str:
47        """The path that this watch monitors."""
48        return self._path
49
50    @property
51    def is_recursive(self) -> bool:
52        """Determines whether subdirectories are watched for the path."""
53        return self._is_recursive
54
55    @property
56    def event_filter(self) -> frozenset[type[FileSystemEvent]] | None:
57        """Collection of event types watched for the path"""
58        return self._event_filter
59
60    @property
61    def key(self) -> tuple[str, bool, frozenset[type[FileSystemEvent]] | None]:
62        return self.path, self.is_recursive, self.event_filter
63
64    def __eq__(self, watch: object) -> bool:
65        if not isinstance(watch, ObservedWatch):
66            return NotImplemented
67        return self.key == watch.key
68
69    def __ne__(self, watch: object) -> bool:
70        if not isinstance(watch, ObservedWatch):
71            return NotImplemented
72        return self.key != watch.key
73
74    def __hash__(self) -> int:
75        return hash(self.key)
76
77    def __repr__(self) -> str:
78        if self.event_filter is not None:
79            event_filter_str = "|".join(sorted(_cls.__name__ for _cls in self.event_filter))
80            event_filter_str = f", event_filter={event_filter_str}"
81        else:
82            event_filter_str = ""
83        return f"<{type(self).__name__}: path={self.path!r}, is_recursive={self.is_recursive}{event_filter_str}>"
84
85
86# Observer classes
87class EventEmitter(BaseThread):
88    """Producer thread base class subclassed by event emitters
89    that generate events and populate a queue with them.
90
91    :param event_queue:
92        The event queue to populate with generated events.
93    :type event_queue:
94        :class:`watchdog.events.EventQueue`
95    :param watch:
96        The watch to observe and produce events for.
97    :type watch:
98        :class:`ObservedWatch`
99    :param timeout:
100        Timeout (in seconds) between successive attempts at reading events.
101    :type timeout:
102        ``float``
103    :param event_filter:
104        Collection of event types to emit, or None for no filtering (default).
105    :type event_filter:
106        Iterable[:class:`watchdog.events.FileSystemEvent`] | None
107    """
108
109    def __init__(
110        self,
111        event_queue: EventQueue,
112        watch: ObservedWatch,
113        *,
114        timeout: float = DEFAULT_EMITTER_TIMEOUT,
115        event_filter: list[type[FileSystemEvent]] | None = None,
116    ) -> None:
117        super().__init__()
118        self._event_queue = event_queue
119        self._watch = watch
120        self._timeout = timeout
121        self._event_filter = frozenset(event_filter) if event_filter is not None else None
122
123    @property
124    def timeout(self) -> float:
125        """Blocking timeout for reading events."""
126        return self._timeout
127
128    @property
129    def watch(self) -> ObservedWatch:
130        """The watch associated with this emitter."""
131        return self._watch
132
133    def queue_event(self, event: FileSystemEvent) -> None:
134        """Queues a single event.
135
136        :param event:
137            Event to be queued.
138        :type event:
139            An instance of :class:`watchdog.events.FileSystemEvent`
140            or a subclass.
141        """
142        if self._event_filter is None or any(isinstance(event, cls) for cls in self._event_filter):
143            self._event_queue.put((event, self.watch))
144
145    def queue_events(self, timeout: float) -> None:
146        """Override this method to populate the event queue with events
147        per interval period.
148
149        :param timeout:
150            Timeout (in seconds) between successive attempts at
151            reading events.
152        :type timeout:
153            ``float``
154        """
155
156    def run(self) -> None:
157        while self.should_keep_running():
158            self.queue_events(self.timeout)
159
160
161class EventDispatcher(BaseThread):
162    """Consumer thread base class subclassed by event observer threads
163    that dispatch events from an event queue to appropriate event handlers.
164
165    :param timeout:
166        Timeout value (in seconds) passed to emitters
167        constructions in the child class BaseObserver.
168    :type timeout:
169        ``float``
170    """
171
172    stop_event = object()
173    """Event inserted into the queue to signal a requested stop."""
174
175    def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
176        super().__init__()
177        self._event_queue = EventQueue()
178        self._timeout = timeout
179
180    @property
181    def timeout(self) -> float:
182        """Timeout value to construct emitters with."""
183        return self._timeout
184
185    def stop(self) -> None:
186        BaseThread.stop(self)
187        with contextlib.suppress(queue.Full):
188            self.event_queue.put_nowait(EventDispatcher.stop_event)
189
190    @property
191    def event_queue(self) -> EventQueue:
192        """The event queue which is populated with file system events
193        by emitters and from which events are dispatched by a dispatcher
194        thread.
195        """
196        return self._event_queue
197
198    def dispatch_events(self, event_queue: EventQueue) -> None:
199        """Override this method to consume events from an event queue, blocking
200        on the queue for the specified timeout before raising :class:`queue.Empty`.
201
202        :param event_queue:
203            Event queue to populate with one set of events.
204        :type event_queue:
205            :class:`EventQueue`
206        :raises:
207            :class:`queue.Empty`
208        """
209
210    def run(self) -> None:
211        while self.should_keep_running():
212            try:
213                self.dispatch_events(self.event_queue)
214            except queue.Empty:
215                continue
216
217
218class BaseObserver(EventDispatcher):
219    """Base observer."""
220
221    def __init__(self, emitter_class: type[EventEmitter], *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
222        super().__init__(timeout=timeout)
223        self._emitter_class = emitter_class
224        self._lock = threading.RLock()
225        self._watches: set[ObservedWatch] = set()
226        self._handlers: defaultdict[ObservedWatch, set[FileSystemEventHandler]] = defaultdict(set)
227        self._emitters: set[EventEmitter] = set()
228        self._emitter_for_watch: dict[ObservedWatch, EventEmitter] = {}
229
230    def _add_emitter(self, emitter: EventEmitter) -> None:
231        self._emitter_for_watch[emitter.watch] = emitter
232        self._emitters.add(emitter)
233
234    def _remove_emitter(self, emitter: EventEmitter) -> None:
235        del self._emitter_for_watch[emitter.watch]
236        self._emitters.remove(emitter)
237        emitter.stop()
238        with contextlib.suppress(RuntimeError):
239            emitter.join()
240
241    def _clear_emitters(self) -> None:
242        for emitter in self._emitters:
243            emitter.stop()
244        for emitter in self._emitters:
245            with contextlib.suppress(RuntimeError):
246                emitter.join()
247        self._emitters.clear()
248        self._emitter_for_watch.clear()
249
250    def _add_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None:
251        self._handlers[watch].add(event_handler)
252
253    def _remove_handlers_for_watch(self, watch: ObservedWatch) -> None:
254        del self._handlers[watch]
255
256    @property
257    def emitters(self) -> set[EventEmitter]:
258        """Returns event emitter created by this observer."""
259        return self._emitters
260
261    def start(self) -> None:
262        for emitter in self._emitters.copy():
263            try:
264                emitter.start()
265            except Exception:
266                self._remove_emitter(emitter)
267                raise
268        super().start()
269
270    def schedule(
271        self,
272        event_handler: FileSystemEventHandler,
273        path: str,
274        *,
275        recursive: bool = False,
276        event_filter: list[type[FileSystemEvent]] | None = None,
277    ) -> ObservedWatch:
278        """Schedules watching a path and calls appropriate methods specified
279        in the given event handler in response to file system events.
280
281        :param event_handler:
282            An event handler instance that has appropriate event handling
283            methods which will be called by the observer in response to
284            file system events.
285        :type event_handler:
286            :class:`watchdog.events.FileSystemEventHandler` or a subclass
287        :param path:
288            Directory path that will be monitored.
289        :type path:
290            ``str``
291        :param recursive:
292            ``True`` if events will be emitted for sub-directories
293            traversed recursively; ``False`` otherwise.
294        :type recursive:
295            ``bool``
296        :param event_filter:
297            Collection of event types to emit, or None for no filtering (default).
298        :type event_filter:
299            Iterable[:class:`watchdog.events.FileSystemEvent`] | None
300        :return:
301            An :class:`ObservedWatch` object instance representing
302            a watch.
303        """
304        with self._lock:
305            watch = ObservedWatch(path, recursive=recursive, event_filter=event_filter)
306            self._add_handler_for_watch(event_handler, watch)
307
308            # If we don't have an emitter for this watch already, create it.
309            if watch not in self._emitter_for_watch:
310                emitter = self._emitter_class(self.event_queue, watch, timeout=self.timeout, event_filter=event_filter)
311                if self.is_alive():
312                    emitter.start()
313                self._add_emitter(emitter)
314            self._watches.add(watch)
315        return watch
316
317    def add_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None:
318        """Adds a handler for the given watch.
319
320        :param event_handler:
321            An event handler instance that has appropriate event handling
322            methods which will be called by the observer in response to
323            file system events.
324        :type event_handler:
325            :class:`watchdog.events.FileSystemEventHandler` or a subclass
326        :param watch:
327            The watch to add a handler for.
328        :type watch:
329            An instance of :class:`ObservedWatch` or a subclass of
330            :class:`ObservedWatch`
331        """
332        with self._lock:
333            self._add_handler_for_watch(event_handler, watch)
334
335    def remove_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None:
336        """Removes a handler for the given watch.
337
338        :param event_handler:
339            An event handler instance that has appropriate event handling
340            methods which will be called by the observer in response to
341            file system events.
342        :type event_handler:
343            :class:`watchdog.events.FileSystemEventHandler` or a subclass
344        :param watch:
345            The watch to remove a handler for.
346        :type watch:
347            An instance of :class:`ObservedWatch` or a subclass of
348            :class:`ObservedWatch`
349        """
350        with self._lock:
351            self._handlers[watch].remove(event_handler)
352
353    def unschedule(self, watch: ObservedWatch) -> None:
354        """Unschedules a watch.
355
356        :param watch:
357            The watch to unschedule.
358        :type watch:
359            An instance of :class:`ObservedWatch` or a subclass of
360            :class:`ObservedWatch`
361        """
362        with self._lock:
363            emitter = self._emitter_for_watch[watch]
364            del self._handlers[watch]
365            self._remove_emitter(emitter)
366            self._watches.remove(watch)
367
368    def unschedule_all(self) -> None:
369        """Unschedules all watches and detaches all associated event handlers."""
370        with self._lock:
371            self._handlers.clear()
372            self._clear_emitters()
373            self._watches.clear()
374
375    def on_thread_stop(self) -> None:
376        self.unschedule_all()
377
378    def dispatch_events(self, event_queue: EventQueue) -> None:
379        entry = event_queue.get(block=True)
380        if entry is EventDispatcher.stop_event:
381            return
382
383        event, watch = entry
384
385        with self._lock:
386            # To allow unschedule/stop and safe removal of event handlers
387            # within event handlers itself, check if the handler is still
388            # registered after every dispatch.
389            for handler in self._handlers[watch].copy():
390                if handler in self._handlers[watch]:
391                    handler.dispatch(event)
392        event_queue.task_done()
393