1from __future__ import annotations 2 3import pytest 4 5from watchdog.utils import platform 6 7if not platform.is_linux(): 8 pytest.skip("GNU/Linux only.", allow_module_level=True) 9 10import os 11import random 12import time 13 14from watchdog.observers.inotify_buffer import InotifyBuffer 15 16from .shell import mkdir, mount_tmpfs, mv, rm, touch, unmount 17 18 19def wait_for_move_event(read_event): 20 while True: 21 event = read_event() 22 if isinstance(event, tuple) or event.is_move: 23 return event 24 25 26@pytest.mark.timeout(5) 27def test_move_from(p): 28 mkdir(p("dir1")) 29 mkdir(p("dir2")) 30 touch(p("dir1", "a")) 31 32 inotify = InotifyBuffer(p("dir1").encode()) 33 mv(p("dir1", "a"), p("dir2", "b")) 34 event = wait_for_move_event(inotify.read_event) 35 assert event.is_moved_from 36 assert event.src_path == p("dir1", "a").encode() 37 inotify.close() 38 39 40@pytest.mark.timeout(5) 41def test_move_to(p): 42 mkdir(p("dir1")) 43 mkdir(p("dir2")) 44 touch(p("dir1", "a")) 45 46 inotify = InotifyBuffer(p("dir2").encode()) 47 mv(p("dir1", "a"), p("dir2", "b")) 48 event = wait_for_move_event(inotify.read_event) 49 assert event.is_moved_to 50 assert event.src_path == p("dir2", "b").encode() 51 inotify.close() 52 53 54@pytest.mark.timeout(5) 55def test_move_internal(p): 56 mkdir(p("dir1")) 57 mkdir(p("dir2")) 58 touch(p("dir1", "a")) 59 60 inotify = InotifyBuffer(p("").encode(), recursive=True) 61 mv(p("dir1", "a"), p("dir2", "b")) 62 frm, to = wait_for_move_event(inotify.read_event) 63 assert frm.src_path == p("dir1", "a").encode() 64 assert to.src_path == p("dir2", "b").encode() 65 inotify.close() 66 67 68@pytest.mark.timeout(10) 69def test_move_internal_batch(p): 70 n = 100 71 mkdir(p("dir1")) 72 mkdir(p("dir2")) 73 files = [str(i) for i in range(n)] 74 for f in files: 75 touch(p("dir1", f)) 76 77 inotify = InotifyBuffer(p("").encode(), recursive=True) 78 79 random.shuffle(files) 80 for f in files: 81 mv(p("dir1", f), p("dir2", f)) 82 83 # Check that all n events are paired 84 for _ in range(n): 85 frm, to = wait_for_move_event(inotify.read_event) 86 assert os.path.dirname(frm.src_path).endswith(b"/dir1") 87 assert os.path.dirname(to.src_path).endswith(b"/dir2") 88 assert frm.name == to.name 89 inotify.close() 90 91 92@pytest.mark.timeout(5) 93def test_delete_watched_directory(p): 94 mkdir(p("dir")) 95 inotify = InotifyBuffer(p("dir").encode()) 96 rm(p("dir"), recursive=True) 97 98 # Wait for the event to be picked up 99 inotify.read_event() 100 101 # Ensure InotifyBuffer shuts down cleanly without raising an exception 102 inotify.close() 103 104 105@pytest.mark.timeout(5) 106@pytest.mark.skipif("GITHUB_REF" not in os.environ, reason="sudo password prompt") 107def test_unmount_watched_directory_filesystem(p): 108 mkdir(p("dir1")) 109 mount_tmpfs(p("dir1")) 110 mkdir(p("dir1/dir2")) 111 inotify = InotifyBuffer(p("dir1/dir2").encode()) 112 unmount(p("dir1")) 113 114 # Wait for the event to be picked up 115 inotify.read_event() 116 117 # Ensure InotifyBuffer shuts down cleanly without raising an exception 118 inotify.close() 119 assert not inotify.is_alive() 120 121 122def delay_call(function, seconds): 123 def delayed(*args, **kwargs): 124 time.sleep(seconds) 125 126 return function(*args, **kwargs) 127 128 return delayed 129 130 131class InotifyBufferDelayedRead(InotifyBuffer): 132 def run(self, *args, **kwargs): 133 # Introduce a delay to trigger the race condition where the file descriptor is 134 # closed prior to a read being triggered. 135 self._inotify.read_events = delay_call(self._inotify.read_events, 1) 136 137 return super().run(*args, **kwargs) 138 139 140@pytest.mark.parametrize(argnames="cls", argvalues=[InotifyBuffer, InotifyBufferDelayedRead]) 141def test_close_should_terminate_thread(p, cls): 142 inotify = cls(p("").encode(), recursive=True) 143 144 assert inotify.is_alive() 145 inotify.close() 146 assert not inotify.is_alive() 147