1from __future__ import annotations
2
3import pytest
4
5from watchdog.utils import platform
6
7if not platform.is_linux():
8    pytest.skip("GNU/Linux only.", allow_module_level=True)
9
10import ctypes
11import errno
12import logging
13import os
14import struct
15from typing import TYPE_CHECKING
16from unittest.mock import patch
17
18from watchdog.events import DirCreatedEvent, DirDeletedEvent, DirModifiedEvent
19from watchdog.observers.inotify_c import Inotify, InotifyConstants, InotifyEvent
20
21if TYPE_CHECKING:
22    from .utils import Helper, P, StartWatching, TestEventQueue
23
24logging.basicConfig(level=logging.DEBUG)
25logger = logging.getLogger(__name__)
26
27
28def struct_inotify(wd, mask, cookie=0, length=0, name=b""):
29    assert len(name) <= length
30    struct_format = (
31        "="  # (native endianness, standard sizes)
32        "i"  # int      wd
33        "i"  # uint32_t mask
34        "i"  # uint32_t cookie
35        "i"  # uint32_t len
36        f"{length}s"  # char[] name
37    )
38    return struct.pack(struct_format, wd, mask, cookie, length, name)
39
40
41def test_late_double_deletion(helper: Helper, p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
42    inotify_fd = type("FD", (object,), {})()
43    inotify_fd.last = 0
44    inotify_fd.wds = []
45
46    const = InotifyConstants()
47
48    # CREATE DELETE CREATE DELETE DELETE_SELF IGNORE DELETE_SELF IGNORE
49    inotify_fd.buf = (
50        struct_inotify(wd=1, mask=const.IN_CREATE | const.IN_ISDIR, length=16, name=b"subdir1")
51        + struct_inotify(wd=1, mask=const.IN_DELETE | const.IN_ISDIR, length=16, name=b"subdir1")
52    ) * 2 + (
53        struct_inotify(wd=2, mask=const.IN_DELETE_SELF)
54        + struct_inotify(wd=2, mask=const.IN_IGNORED)
55        + struct_inotify(wd=3, mask=const.IN_DELETE_SELF)
56        + struct_inotify(wd=3, mask=const.IN_IGNORED)
57    )
58
59    os_read_bkp = os.read
60
61    def fakeread(fd, length):
62        if fd is inotify_fd:
63            result, fd.buf = fd.buf[:length], fd.buf[length:]
64            return result
65        return os_read_bkp(fd, length)
66
67    os_close_bkp = os.close
68
69    def fakeclose(fd):
70        if fd is not inotify_fd:
71            os_close_bkp(fd)
72
73    def inotify_init():
74        return inotify_fd
75
76    def inotify_add_watch(fd, path, mask):
77        fd.last += 1
78        logger.debug("New wd = %d", fd.last)
79        fd.wds.append(fd.last)
80        return fd.last
81
82    def inotify_rm_watch(fd, wd):
83        logger.debug("Removing wd = %d", wd)
84        fd.wds.remove(wd)
85        return 0
86
87    # Mocks the API!
88    from watchdog.observers import inotify_c
89
90    mock1 = patch.object(os, "read", new=fakeread)
91    mock2 = patch.object(os, "close", new=fakeclose)
92    mock3 = patch.object(inotify_c, "inotify_init", new=inotify_init)
93    mock4 = patch.object(inotify_c, "inotify_add_watch", new=inotify_add_watch)
94    mock5 = patch.object(inotify_c, "inotify_rm_watch", new=inotify_rm_watch)
95
96    with mock1, mock2, mock3, mock4, mock5:
97        start_watching(path=p(""))
98        # Watchdog Events
99        for evt_cls in [DirCreatedEvent, DirDeletedEvent] * 2:
100            event = event_queue.get(timeout=5)[0]
101            assert isinstance(event, evt_cls)
102            assert event.src_path == p("subdir1")
103            event = event_queue.get(timeout=5)[0]
104            assert isinstance(event, DirModifiedEvent)
105            assert event.src_path == p("").rstrip(os.path.sep)
106        helper.close()
107
108    assert inotify_fd.last == 3  # Number of directories
109    assert inotify_fd.buf == b""  # Didn't miss any event
110    assert inotify_fd.wds == [2, 3]  # Only 1 is removed explicitly
111
112
113@pytest.mark.parametrize(
114    ("error", "pattern"),
115    [
116        (errno.ENOSPC, "inotify watch limit reached"),
117        (errno.EMFILE, "inotify instance limit reached"),
118        (errno.ENOENT, "No such file or directory"),
119        (-1, "error"),
120    ],
121)
122def test_raise_error(error, pattern):
123    with patch.object(ctypes, "get_errno", new=lambda: error), pytest.raises(OSError, match=pattern) as exc:
124        Inotify._raise_error()  # noqa: SLF001
125    assert exc.value.errno == error
126
127
128def test_non_ascii_path(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
129    """
130    Inotify can construct an event for a path containing non-ASCII.
131    """
132    path = p("\N{SNOWMAN}")
133    start_watching(path=p(""))
134    os.mkdir(path)
135    event, _ = event_queue.get(timeout=5)
136    assert isinstance(event.src_path, str)
137    assert event.src_path == path
138    # Just make sure it doesn't raise an exception.
139    assert repr(event)
140
141
142def test_watch_file(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
143    path = p("this_is_a_file")
144    with open(path, "a"):
145        pass
146    start_watching(path=path)
147    os.remove(path)
148    event, _ = event_queue.get(timeout=5)
149    assert repr(event)
150
151
152def test_event_equality(p: P) -> None:
153    wd_parent_dir = 42
154    filename = "file.ext"
155    full_path = p(filename)
156    event1 = InotifyEvent(wd_parent_dir, InotifyConstants.IN_CREATE, 0, filename, full_path)
157    event2 = InotifyEvent(wd_parent_dir, InotifyConstants.IN_CREATE, 0, filename, full_path)
158    event3 = InotifyEvent(wd_parent_dir, InotifyConstants.IN_ACCESS, 0, filename, full_path)
159    assert event1 == event2
160    assert event1 != event3
161    assert event2 != event3
162