1from __future__ import annotations 2 3import contextlib 4import queue 5import threading 6from collections import defaultdict 7from pathlib import Path 8from typing import TYPE_CHECKING 9 10from watchdog.utils import BaseThread 11from watchdog.utils.bricks import SkipRepeatsQueue 12 13if TYPE_CHECKING: 14 from watchdog.events import FileSystemEvent, FileSystemEventHandler 15 16DEFAULT_EMITTER_TIMEOUT = 1.0 # in seconds 17DEFAULT_OBSERVER_TIMEOUT = 1.0 # in seconds 18 19 20class EventQueue(SkipRepeatsQueue): 21 """Thread-safe event queue based on a special queue that skips adding 22 the same event (:class:`FileSystemEvent`) multiple times consecutively. 23 Thus avoiding dispatching multiple event handling 24 calls when multiple identical events are produced quicker than an observer 25 can consume them. 26 """ 27 28 29class ObservedWatch: 30 """An scheduled watch. 31 32 :param path: 33 Path string. 34 :param recursive: 35 ``True`` if watch is recursive; ``False`` otherwise. 36 :param event_filter: 37 Optional collection of :class:`watchdog.events.FileSystemEvent` to watch 38 """ 39 40 def __init__(self, path: str | Path, *, recursive: bool, event_filter: list[type[FileSystemEvent]] | None = None): 41 self._path = str(path) if isinstance(path, Path) else path 42 self._is_recursive = recursive 43 self._event_filter = frozenset(event_filter) if event_filter is not None else None 44 45 @property 46 def path(self) -> str: 47 """The path that this watch monitors.""" 48 return self._path 49 50 @property 51 def is_recursive(self) -> bool: 52 """Determines whether subdirectories are watched for the path.""" 53 return self._is_recursive 54 55 @property 56 def event_filter(self) -> frozenset[type[FileSystemEvent]] | None: 57 """Collection of event types watched for the path""" 58 return self._event_filter 59 60 @property 61 def key(self) -> tuple[str, bool, frozenset[type[FileSystemEvent]] | None]: 62 return self.path, self.is_recursive, self.event_filter 63 64 def __eq__(self, watch: object) -> bool: 65 if not isinstance(watch, ObservedWatch): 66 return NotImplemented 67 return self.key == watch.key 68 69 def __ne__(self, watch: object) -> bool: 70 if not isinstance(watch, ObservedWatch): 71 return NotImplemented 72 return self.key != watch.key 73 74 def __hash__(self) -> int: 75 return hash(self.key) 76 77 def __repr__(self) -> str: 78 if self.event_filter is not None: 79 event_filter_str = "|".join(sorted(_cls.__name__ for _cls in self.event_filter)) 80 event_filter_str = f", event_filter={event_filter_str}" 81 else: 82 event_filter_str = "" 83 return f"<{type(self).__name__}: path={self.path!r}, is_recursive={self.is_recursive}{event_filter_str}>" 84 85 86# Observer classes 87class EventEmitter(BaseThread): 88 """Producer thread base class subclassed by event emitters 89 that generate events and populate a queue with them. 90 91 :param event_queue: 92 The event queue to populate with generated events. 93 :type event_queue: 94 :class:`watchdog.events.EventQueue` 95 :param watch: 96 The watch to observe and produce events for. 97 :type watch: 98 :class:`ObservedWatch` 99 :param timeout: 100 Timeout (in seconds) between successive attempts at reading events. 101 :type timeout: 102 ``float`` 103 :param event_filter: 104 Collection of event types to emit, or None for no filtering (default). 105 :type event_filter: 106 Iterable[:class:`watchdog.events.FileSystemEvent`] | None 107 """ 108 109 def __init__( 110 self, 111 event_queue: EventQueue, 112 watch: ObservedWatch, 113 *, 114 timeout: float = DEFAULT_EMITTER_TIMEOUT, 115 event_filter: list[type[FileSystemEvent]] | None = None, 116 ) -> None: 117 super().__init__() 118 self._event_queue = event_queue 119 self._watch = watch 120 self._timeout = timeout 121 self._event_filter = frozenset(event_filter) if event_filter is not None else None 122 123 @property 124 def timeout(self) -> float: 125 """Blocking timeout for reading events.""" 126 return self._timeout 127 128 @property 129 def watch(self) -> ObservedWatch: 130 """The watch associated with this emitter.""" 131 return self._watch 132 133 def queue_event(self, event: FileSystemEvent) -> None: 134 """Queues a single event. 135 136 :param event: 137 Event to be queued. 138 :type event: 139 An instance of :class:`watchdog.events.FileSystemEvent` 140 or a subclass. 141 """ 142 if self._event_filter is None or any(isinstance(event, cls) for cls in self._event_filter): 143 self._event_queue.put((event, self.watch)) 144 145 def queue_events(self, timeout: float) -> None: 146 """Override this method to populate the event queue with events 147 per interval period. 148 149 :param timeout: 150 Timeout (in seconds) between successive attempts at 151 reading events. 152 :type timeout: 153 ``float`` 154 """ 155 156 def run(self) -> None: 157 while self.should_keep_running(): 158 self.queue_events(self.timeout) 159 160 161class EventDispatcher(BaseThread): 162 """Consumer thread base class subclassed by event observer threads 163 that dispatch events from an event queue to appropriate event handlers. 164 165 :param timeout: 166 Timeout value (in seconds) passed to emitters 167 constructions in the child class BaseObserver. 168 :type timeout: 169 ``float`` 170 """ 171 172 stop_event = object() 173 """Event inserted into the queue to signal a requested stop.""" 174 175 def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None: 176 super().__init__() 177 self._event_queue = EventQueue() 178 self._timeout = timeout 179 180 @property 181 def timeout(self) -> float: 182 """Timeout value to construct emitters with.""" 183 return self._timeout 184 185 def stop(self) -> None: 186 BaseThread.stop(self) 187 with contextlib.suppress(queue.Full): 188 self.event_queue.put_nowait(EventDispatcher.stop_event) 189 190 @property 191 def event_queue(self) -> EventQueue: 192 """The event queue which is populated with file system events 193 by emitters and from which events are dispatched by a dispatcher 194 thread. 195 """ 196 return self._event_queue 197 198 def dispatch_events(self, event_queue: EventQueue) -> None: 199 """Override this method to consume events from an event queue, blocking 200 on the queue for the specified timeout before raising :class:`queue.Empty`. 201 202 :param event_queue: 203 Event queue to populate with one set of events. 204 :type event_queue: 205 :class:`EventQueue` 206 :raises: 207 :class:`queue.Empty` 208 """ 209 210 def run(self) -> None: 211 while self.should_keep_running(): 212 try: 213 self.dispatch_events(self.event_queue) 214 except queue.Empty: 215 continue 216 217 218class BaseObserver(EventDispatcher): 219 """Base observer.""" 220 221 def __init__(self, emitter_class: type[EventEmitter], *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None: 222 super().__init__(timeout=timeout) 223 self._emitter_class = emitter_class 224 self._lock = threading.RLock() 225 self._watches: set[ObservedWatch] = set() 226 self._handlers: defaultdict[ObservedWatch, set[FileSystemEventHandler]] = defaultdict(set) 227 self._emitters: set[EventEmitter] = set() 228 self._emitter_for_watch: dict[ObservedWatch, EventEmitter] = {} 229 230 def _add_emitter(self, emitter: EventEmitter) -> None: 231 self._emitter_for_watch[emitter.watch] = emitter 232 self._emitters.add(emitter) 233 234 def _remove_emitter(self, emitter: EventEmitter) -> None: 235 del self._emitter_for_watch[emitter.watch] 236 self._emitters.remove(emitter) 237 emitter.stop() 238 with contextlib.suppress(RuntimeError): 239 emitter.join() 240 241 def _clear_emitters(self) -> None: 242 for emitter in self._emitters: 243 emitter.stop() 244 for emitter in self._emitters: 245 with contextlib.suppress(RuntimeError): 246 emitter.join() 247 self._emitters.clear() 248 self._emitter_for_watch.clear() 249 250 def _add_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None: 251 self._handlers[watch].add(event_handler) 252 253 def _remove_handlers_for_watch(self, watch: ObservedWatch) -> None: 254 del self._handlers[watch] 255 256 @property 257 def emitters(self) -> set[EventEmitter]: 258 """Returns event emitter created by this observer.""" 259 return self._emitters 260 261 def start(self) -> None: 262 for emitter in self._emitters.copy(): 263 try: 264 emitter.start() 265 except Exception: 266 self._remove_emitter(emitter) 267 raise 268 super().start() 269 270 def schedule( 271 self, 272 event_handler: FileSystemEventHandler, 273 path: str, 274 *, 275 recursive: bool = False, 276 event_filter: list[type[FileSystemEvent]] | None = None, 277 ) -> ObservedWatch: 278 """Schedules watching a path and calls appropriate methods specified 279 in the given event handler in response to file system events. 280 281 :param event_handler: 282 An event handler instance that has appropriate event handling 283 methods which will be called by the observer in response to 284 file system events. 285 :type event_handler: 286 :class:`watchdog.events.FileSystemEventHandler` or a subclass 287 :param path: 288 Directory path that will be monitored. 289 :type path: 290 ``str`` 291 :param recursive: 292 ``True`` if events will be emitted for sub-directories 293 traversed recursively; ``False`` otherwise. 294 :type recursive: 295 ``bool`` 296 :param event_filter: 297 Collection of event types to emit, or None for no filtering (default). 298 :type event_filter: 299 Iterable[:class:`watchdog.events.FileSystemEvent`] | None 300 :return: 301 An :class:`ObservedWatch` object instance representing 302 a watch. 303 """ 304 with self._lock: 305 watch = ObservedWatch(path, recursive=recursive, event_filter=event_filter) 306 self._add_handler_for_watch(event_handler, watch) 307 308 # If we don't have an emitter for this watch already, create it. 309 if watch not in self._emitter_for_watch: 310 emitter = self._emitter_class(self.event_queue, watch, timeout=self.timeout, event_filter=event_filter) 311 if self.is_alive(): 312 emitter.start() 313 self._add_emitter(emitter) 314 self._watches.add(watch) 315 return watch 316 317 def add_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None: 318 """Adds a handler for the given watch. 319 320 :param event_handler: 321 An event handler instance that has appropriate event handling 322 methods which will be called by the observer in response to 323 file system events. 324 :type event_handler: 325 :class:`watchdog.events.FileSystemEventHandler` or a subclass 326 :param watch: 327 The watch to add a handler for. 328 :type watch: 329 An instance of :class:`ObservedWatch` or a subclass of 330 :class:`ObservedWatch` 331 """ 332 with self._lock: 333 self._add_handler_for_watch(event_handler, watch) 334 335 def remove_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None: 336 """Removes a handler for the given watch. 337 338 :param event_handler: 339 An event handler instance that has appropriate event handling 340 methods which will be called by the observer in response to 341 file system events. 342 :type event_handler: 343 :class:`watchdog.events.FileSystemEventHandler` or a subclass 344 :param watch: 345 The watch to remove a handler for. 346 :type watch: 347 An instance of :class:`ObservedWatch` or a subclass of 348 :class:`ObservedWatch` 349 """ 350 with self._lock: 351 self._handlers[watch].remove(event_handler) 352 353 def unschedule(self, watch: ObservedWatch) -> None: 354 """Unschedules a watch. 355 356 :param watch: 357 The watch to unschedule. 358 :type watch: 359 An instance of :class:`ObservedWatch` or a subclass of 360 :class:`ObservedWatch` 361 """ 362 with self._lock: 363 emitter = self._emitter_for_watch[watch] 364 del self._handlers[watch] 365 self._remove_emitter(emitter) 366 self._watches.remove(watch) 367 368 def unschedule_all(self) -> None: 369 """Unschedules all watches and detaches all associated event handlers.""" 370 with self._lock: 371 self._handlers.clear() 372 self._clear_emitters() 373 self._watches.clear() 374 375 def on_thread_stop(self) -> None: 376 self.unschedule_all() 377 378 def dispatch_events(self, event_queue: EventQueue) -> None: 379 entry = event_queue.get(block=True) 380 if entry is EventDispatcher.stop_event: 381 return 382 383 event, watch = entry 384 385 with self._lock: 386 # To allow unschedule/stop and safe removal of event handlers 387 # within event handlers itself, check if the handler is still 388 # registered after every dispatch. 389 for handler in self._handlers[watch].copy(): 390 if handler in self._handlers[watch]: 391 handler.dispatch(event) 392 event_queue.task_done() 393