1from __future__ import annotations
2
3import pytest
4
5from watchdog.utils import platform
6
7if not platform.is_linux():
8    pytest.skip("GNU/Linux only.", allow_module_level=True)
9
10import os
11import random
12import time
13
14from watchdog.observers.inotify_buffer import InotifyBuffer
15
16from .shell import mkdir, mount_tmpfs, mv, rm, touch, unmount
17
18
19def wait_for_move_event(read_event):
20    while True:
21        event = read_event()
22        if isinstance(event, tuple) or event.is_move:
23            return event
24
25
26@pytest.mark.timeout(5)
27def test_move_from(p):
28    mkdir(p("dir1"))
29    mkdir(p("dir2"))
30    touch(p("dir1", "a"))
31
32    inotify = InotifyBuffer(p("dir1").encode())
33    mv(p("dir1", "a"), p("dir2", "b"))
34    event = wait_for_move_event(inotify.read_event)
35    assert event.is_moved_from
36    assert event.src_path == p("dir1", "a").encode()
37    inotify.close()
38
39
40@pytest.mark.timeout(5)
41def test_move_to(p):
42    mkdir(p("dir1"))
43    mkdir(p("dir2"))
44    touch(p("dir1", "a"))
45
46    inotify = InotifyBuffer(p("dir2").encode())
47    mv(p("dir1", "a"), p("dir2", "b"))
48    event = wait_for_move_event(inotify.read_event)
49    assert event.is_moved_to
50    assert event.src_path == p("dir2", "b").encode()
51    inotify.close()
52
53
54@pytest.mark.timeout(5)
55def test_move_internal(p):
56    mkdir(p("dir1"))
57    mkdir(p("dir2"))
58    touch(p("dir1", "a"))
59
60    inotify = InotifyBuffer(p("").encode(), recursive=True)
61    mv(p("dir1", "a"), p("dir2", "b"))
62    frm, to = wait_for_move_event(inotify.read_event)
63    assert frm.src_path == p("dir1", "a").encode()
64    assert to.src_path == p("dir2", "b").encode()
65    inotify.close()
66
67
68@pytest.mark.timeout(10)
69def test_move_internal_batch(p):
70    n = 100
71    mkdir(p("dir1"))
72    mkdir(p("dir2"))
73    files = [str(i) for i in range(n)]
74    for f in files:
75        touch(p("dir1", f))
76
77    inotify = InotifyBuffer(p("").encode(), recursive=True)
78
79    random.shuffle(files)
80    for f in files:
81        mv(p("dir1", f), p("dir2", f))
82
83    # Check that all n events are paired
84    for _ in range(n):
85        frm, to = wait_for_move_event(inotify.read_event)
86        assert os.path.dirname(frm.src_path).endswith(b"/dir1")
87        assert os.path.dirname(to.src_path).endswith(b"/dir2")
88        assert frm.name == to.name
89    inotify.close()
90
91
92@pytest.mark.timeout(5)
93def test_delete_watched_directory(p):
94    mkdir(p("dir"))
95    inotify = InotifyBuffer(p("dir").encode())
96    rm(p("dir"), recursive=True)
97
98    # Wait for the event to be picked up
99    inotify.read_event()
100
101    # Ensure InotifyBuffer shuts down cleanly without raising an exception
102    inotify.close()
103
104
105@pytest.mark.timeout(5)
106@pytest.mark.skipif("GITHUB_REF" not in os.environ, reason="sudo password prompt")
107def test_unmount_watched_directory_filesystem(p):
108    mkdir(p("dir1"))
109    mount_tmpfs(p("dir1"))
110    mkdir(p("dir1/dir2"))
111    inotify = InotifyBuffer(p("dir1/dir2").encode())
112    unmount(p("dir1"))
113
114    # Wait for the event to be picked up
115    inotify.read_event()
116
117    # Ensure InotifyBuffer shuts down cleanly without raising an exception
118    inotify.close()
119    assert not inotify.is_alive()
120
121
122def delay_call(function, seconds):
123    def delayed(*args, **kwargs):
124        time.sleep(seconds)
125
126        return function(*args, **kwargs)
127
128    return delayed
129
130
131class InotifyBufferDelayedRead(InotifyBuffer):
132    def run(self, *args, **kwargs):
133        # Introduce a delay to trigger the race condition where the file descriptor is
134        # closed prior to a read being triggered.
135        self._inotify.read_events = delay_call(self._inotify.read_events, 1)
136
137        return super().run(*args, **kwargs)
138
139
140@pytest.mark.parametrize(argnames="cls", argvalues=[InotifyBuffer, InotifyBufferDelayedRead])
141def test_close_should_terminate_thread(p, cls):
142    inotify = cls(p("").encode(), recursive=True)
143
144    assert inotify.is_alive()
145    inotify.close()
146    assert not inotify.is_alive()
147