from __future__ import annotations import os from queue import Empty, Queue from time import sleep import pytest from watchdog.events import ( DirCreatedEvent, DirDeletedEvent, DirModifiedEvent, DirMovedEvent, FileCreatedEvent, FileDeletedEvent, FileModifiedEvent, FileMovedEvent, ) from watchdog.observers.api import ObservedWatch from watchdog.observers.polling import PollingEmitter as Emitter from .shell import mkdir, mkdtemp, msize, mv, rm, touch SLEEP_TIME = 0.4 TEMP_DIR = mkdtemp() def p(*args): """ Convenience function to join the temporary directory path with the provided arguments. """ return os.path.join(TEMP_DIR, *args) @pytest.fixture def event_queue(): return Queue() @pytest.fixture def emitter(event_queue): watch = ObservedWatch(TEMP_DIR, recursive=True) em = Emitter(event_queue, watch, timeout=0.2) em.start() yield em em.stop() em.join(5) def test___init__(event_queue, emitter): sleep(SLEEP_TIME) mkdir(p("project")) sleep(SLEEP_TIME) mkdir(p("project", "blah")) sleep(SLEEP_TIME) touch(p("afile")) sleep(SLEEP_TIME) touch(p("fromfile")) sleep(SLEEP_TIME) mv(p("fromfile"), p("project", "tofile")) sleep(SLEEP_TIME) touch(p("afile")) sleep(SLEEP_TIME) mv(p("project", "blah"), p("project", "boo")) sleep(SLEEP_TIME) rm(p("project"), recursive=True) sleep(SLEEP_TIME) rm(p("afile")) sleep(SLEEP_TIME) msize(p("bfile")) sleep(SLEEP_TIME) rm(p("bfile")) sleep(SLEEP_TIME) emitter.stop() # What we need here for the tests to pass is a collection type # that is: # * unordered # * non-unique # A multiset! Python's collections.Counter class seems appropriate. expected = { DirModifiedEvent(p()), DirCreatedEvent(p("project")), DirModifiedEvent(p("project")), DirCreatedEvent(p("project", "blah")), FileCreatedEvent(p("afile")), DirModifiedEvent(p()), FileCreatedEvent(p("fromfile")), DirModifiedEvent(p()), DirModifiedEvent(p()), FileModifiedEvent(p("afile")), DirModifiedEvent(p("project")), DirModifiedEvent(p()), FileDeletedEvent(p("project", "tofile")), DirDeletedEvent(p("project", "boo")), DirDeletedEvent(p("project")), DirModifiedEvent(p()), FileDeletedEvent(p("afile")), DirModifiedEvent(p()), FileCreatedEvent(p("bfile")), FileModifiedEvent(p("bfile")), DirModifiedEvent(p()), FileDeletedEvent(p("bfile")), } expected.add(FileMovedEvent(p("fromfile"), p("project", "tofile"))) expected.add(DirMovedEvent(p("project", "blah"), p("project", "boo"))) got = set() while True: try: event, _ = event_queue.get_nowait() got.add(event) except Empty: break assert expected == got def test_delete_watched_dir(event_queue, emitter): rm(p(""), recursive=True) sleep(SLEEP_TIME) emitter.stop() # What we need here for the tests to pass is a collection type # that is: # * unordered # * non-unique # A multiset! Python's collections.Counter class seems appropriate. expected = { DirDeletedEvent(os.path.dirname(p(""))), } got = set() while True: try: event, _ = event_queue.get_nowait() got.add(event) except Empty: break assert expected == got