from __future__ import annotations import dataclasses import os from queue import Queue from typing import Protocol from watchdog.events import FileSystemEvent from watchdog.observers.api import EventEmitter, ObservedWatch from watchdog.utils import platform Emitter: type[EventEmitter] if platform.is_linux(): from watchdog.observers.inotify import InotifyEmitter as Emitter from watchdog.observers.inotify import InotifyFullEmitter elif platform.is_darwin(): from watchdog.observers.fsevents import FSEventsEmitter as Emitter elif platform.is_windows(): from watchdog.observers.read_directory_changes import WindowsApiEmitter as Emitter elif platform.is_bsd(): from watchdog.observers.kqueue import KqueueEmitter as Emitter class P(Protocol): def __call__(self, *args: str) -> str: ... class StartWatching(Protocol): def __call__( self, *, path: bytes | str | None = ..., use_full_emitter: bool = ..., recursive: bool = ..., ) -> EventEmitter: ... class ExpectEvent(Protocol): def __call__(self, expected_event: FileSystemEvent, *, timeout: float = ...) -> None: ... TestEventQueue = Queue[tuple[FileSystemEvent, ObservedWatch]] @dataclasses.dataclass() class Helper: tmp: str emitters: list[EventEmitter] = dataclasses.field(default_factory=list) event_queue: TestEventQueue = dataclasses.field(default_factory=Queue) def joinpath(self, *args: str) -> str: return os.path.join(self.tmp, *args) def start_watching( self, *, path: bytes | str | None = None, use_full_emitter: bool = False, recursive: bool = True, ) -> EventEmitter: # TODO: check if other platforms expect the trailing slash (e.g. `p('')`) path = self.tmp if path is None else path watcher = ObservedWatch(path, recursive=recursive) emitter_cls = InotifyFullEmitter if platform.is_linux() and use_full_emitter else Emitter emitter = emitter_cls(self.event_queue, watcher) if platform.is_darwin(): # TODO: I think this could be better... .suppress_history should maybe # become a common attribute. from watchdog.observers.fsevents import FSEventsEmitter assert isinstance(emitter, FSEventsEmitter) emitter.suppress_history = True self.emitters.append(emitter) emitter.start() return emitter def expect_event(self, expected_event: FileSystemEvent, timeout: float = 2) -> None: """Utility function to wait up to `timeout` seconds for an `event_type` for `path` to show up in the queue. Provides some robustness for the otherwise flaky nature of asynchronous notifications. """ assert self.event_queue.get(timeout=timeout)[0] == expected_event def close(self) -> None: for emitter in self.emitters: emitter.stop() for emitter in self.emitters: if emitter.is_alive(): emitter.join(5) alive = [emitter.is_alive() for emitter in self.emitters] self.emitters = [] assert alive == [False] * len(alive)