from __future__ import annotations import pytest from watchdog.utils import platform if not platform.is_linux(): pytest.skip("GNU/Linux only.", allow_module_level=True) import os import random import time from watchdog.observers.inotify_buffer import InotifyBuffer from .shell import mkdir, mount_tmpfs, mv, rm, touch, unmount def wait_for_move_event(read_event): while True: event = read_event() if isinstance(event, tuple) or event.is_move: return event @pytest.mark.timeout(5) def test_move_from(p): mkdir(p("dir1")) mkdir(p("dir2")) touch(p("dir1", "a")) inotify = InotifyBuffer(p("dir1").encode()) mv(p("dir1", "a"), p("dir2", "b")) event = wait_for_move_event(inotify.read_event) assert event.is_moved_from assert event.src_path == p("dir1", "a").encode() inotify.close() @pytest.mark.timeout(5) def test_move_to(p): mkdir(p("dir1")) mkdir(p("dir2")) touch(p("dir1", "a")) inotify = InotifyBuffer(p("dir2").encode()) mv(p("dir1", "a"), p("dir2", "b")) event = wait_for_move_event(inotify.read_event) assert event.is_moved_to assert event.src_path == p("dir2", "b").encode() inotify.close() @pytest.mark.timeout(5) def test_move_internal(p): mkdir(p("dir1")) mkdir(p("dir2")) touch(p("dir1", "a")) inotify = InotifyBuffer(p("").encode(), recursive=True) mv(p("dir1", "a"), p("dir2", "b")) frm, to = wait_for_move_event(inotify.read_event) assert frm.src_path == p("dir1", "a").encode() assert to.src_path == p("dir2", "b").encode() inotify.close() @pytest.mark.timeout(10) def test_move_internal_batch(p): n = 100 mkdir(p("dir1")) mkdir(p("dir2")) files = [str(i) for i in range(n)] for f in files: touch(p("dir1", f)) inotify = InotifyBuffer(p("").encode(), recursive=True) random.shuffle(files) for f in files: mv(p("dir1", f), p("dir2", f)) # Check that all n events are paired for _ in range(n): frm, to = wait_for_move_event(inotify.read_event) assert os.path.dirname(frm.src_path).endswith(b"/dir1") assert os.path.dirname(to.src_path).endswith(b"/dir2") assert frm.name == to.name inotify.close() @pytest.mark.timeout(5) def test_delete_watched_directory(p): mkdir(p("dir")) inotify = InotifyBuffer(p("dir").encode()) rm(p("dir"), recursive=True) # Wait for the event to be picked up inotify.read_event() # Ensure InotifyBuffer shuts down cleanly without raising an exception inotify.close() @pytest.mark.timeout(5) @pytest.mark.skipif("GITHUB_REF" not in os.environ, reason="sudo password prompt") def test_unmount_watched_directory_filesystem(p): mkdir(p("dir1")) mount_tmpfs(p("dir1")) mkdir(p("dir1/dir2")) inotify = InotifyBuffer(p("dir1/dir2").encode()) unmount(p("dir1")) # Wait for the event to be picked up inotify.read_event() # Ensure InotifyBuffer shuts down cleanly without raising an exception inotify.close() assert not inotify.is_alive() def delay_call(function, seconds): def delayed(*args, **kwargs): time.sleep(seconds) return function(*args, **kwargs) return delayed class InotifyBufferDelayedRead(InotifyBuffer): def run(self, *args, **kwargs): # Introduce a delay to trigger the race condition where the file descriptor is # closed prior to a read being triggered. self._inotify.read_events = delay_call(self._inotify.read_events, 1) return super().run(*args, **kwargs) @pytest.mark.parametrize(argnames="cls", argvalues=[InotifyBuffer, InotifyBufferDelayedRead]) def test_close_should_terminate_thread(p, cls): inotify = cls(p("").encode(), recursive=True) assert inotify.is_alive() inotify.close() assert not inotify.is_alive()