1from __future__ import annotations 2 3import pytest 4 5from watchdog.utils import platform 6 7if not platform.is_linux(): 8 pytest.skip("GNU/Linux only.", allow_module_level=True) 9 10import ctypes 11import errno 12import logging 13import os 14import struct 15from typing import TYPE_CHECKING 16from unittest.mock import patch 17 18from watchdog.events import DirCreatedEvent, DirDeletedEvent, DirModifiedEvent 19from watchdog.observers.inotify_c import Inotify, InotifyConstants, InotifyEvent 20 21if TYPE_CHECKING: 22 from .utils import Helper, P, StartWatching, TestEventQueue 23 24logging.basicConfig(level=logging.DEBUG) 25logger = logging.getLogger(__name__) 26 27 28def struct_inotify(wd, mask, cookie=0, length=0, name=b""): 29 assert len(name) <= length 30 struct_format = ( 31 "=" # (native endianness, standard sizes) 32 "i" # int wd 33 "i" # uint32_t mask 34 "i" # uint32_t cookie 35 "i" # uint32_t len 36 f"{length}s" # char[] name 37 ) 38 return struct.pack(struct_format, wd, mask, cookie, length, name) 39 40 41def test_late_double_deletion(helper: Helper, p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: 42 inotify_fd = type("FD", (object,), {})() 43 inotify_fd.last = 0 44 inotify_fd.wds = [] 45 46 const = InotifyConstants() 47 48 # CREATE DELETE CREATE DELETE DELETE_SELF IGNORE DELETE_SELF IGNORE 49 inotify_fd.buf = ( 50 struct_inotify(wd=1, mask=const.IN_CREATE | const.IN_ISDIR, length=16, name=b"subdir1") 51 + struct_inotify(wd=1, mask=const.IN_DELETE | const.IN_ISDIR, length=16, name=b"subdir1") 52 ) * 2 + ( 53 struct_inotify(wd=2, mask=const.IN_DELETE_SELF) 54 + struct_inotify(wd=2, mask=const.IN_IGNORED) 55 + struct_inotify(wd=3, mask=const.IN_DELETE_SELF) 56 + struct_inotify(wd=3, mask=const.IN_IGNORED) 57 ) 58 59 os_read_bkp = os.read 60 61 def fakeread(fd, length): 62 if fd is inotify_fd: 63 result, fd.buf = fd.buf[:length], fd.buf[length:] 64 return result 65 return os_read_bkp(fd, length) 66 67 os_close_bkp = os.close 68 69 def fakeclose(fd): 70 if fd is not inotify_fd: 71 os_close_bkp(fd) 72 73 def inotify_init(): 74 return inotify_fd 75 76 def inotify_add_watch(fd, path, mask): 77 fd.last += 1 78 logger.debug("New wd = %d", fd.last) 79 fd.wds.append(fd.last) 80 return fd.last 81 82 def inotify_rm_watch(fd, wd): 83 logger.debug("Removing wd = %d", wd) 84 fd.wds.remove(wd) 85 return 0 86 87 # Mocks the API! 88 from watchdog.observers import inotify_c 89 90 mock1 = patch.object(os, "read", new=fakeread) 91 mock2 = patch.object(os, "close", new=fakeclose) 92 mock3 = patch.object(inotify_c, "inotify_init", new=inotify_init) 93 mock4 = patch.object(inotify_c, "inotify_add_watch", new=inotify_add_watch) 94 mock5 = patch.object(inotify_c, "inotify_rm_watch", new=inotify_rm_watch) 95 96 with mock1, mock2, mock3, mock4, mock5: 97 start_watching(path=p("")) 98 # Watchdog Events 99 for evt_cls in [DirCreatedEvent, DirDeletedEvent] * 2: 100 event = event_queue.get(timeout=5)[0] 101 assert isinstance(event, evt_cls) 102 assert event.src_path == p("subdir1") 103 event = event_queue.get(timeout=5)[0] 104 assert isinstance(event, DirModifiedEvent) 105 assert event.src_path == p("").rstrip(os.path.sep) 106 helper.close() 107 108 assert inotify_fd.last == 3 # Number of directories 109 assert inotify_fd.buf == b"" # Didn't miss any event 110 assert inotify_fd.wds == [2, 3] # Only 1 is removed explicitly 111 112 113@pytest.mark.parametrize( 114 ("error", "pattern"), 115 [ 116 (errno.ENOSPC, "inotify watch limit reached"), 117 (errno.EMFILE, "inotify instance limit reached"), 118 (errno.ENOENT, "No such file or directory"), 119 (-1, "error"), 120 ], 121) 122def test_raise_error(error, pattern): 123 with patch.object(ctypes, "get_errno", new=lambda: error), pytest.raises(OSError, match=pattern) as exc: 124 Inotify._raise_error() # noqa: SLF001 125 assert exc.value.errno == error 126 127 128def test_non_ascii_path(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: 129 """ 130 Inotify can construct an event for a path containing non-ASCII. 131 """ 132 path = p("\N{SNOWMAN}") 133 start_watching(path=p("")) 134 os.mkdir(path) 135 event, _ = event_queue.get(timeout=5) 136 assert isinstance(event.src_path, str) 137 assert event.src_path == path 138 # Just make sure it doesn't raise an exception. 139 assert repr(event) 140 141 142def test_watch_file(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: 143 path = p("this_is_a_file") 144 with open(path, "a"): 145 pass 146 start_watching(path=path) 147 os.remove(path) 148 event, _ = event_queue.get(timeout=5) 149 assert repr(event) 150 151 152def test_event_equality(p: P) -> None: 153 wd_parent_dir = 42 154 filename = "file.ext" 155 full_path = p(filename) 156 event1 = InotifyEvent(wd_parent_dir, InotifyConstants.IN_CREATE, 0, filename, full_path) 157 event2 = InotifyEvent(wd_parent_dir, InotifyConstants.IN_CREATE, 0, filename, full_path) 158 event3 = InotifyEvent(wd_parent_dir, InotifyConstants.IN_ACCESS, 0, filename, full_path) 159 assert event1 == event2 160 assert event1 != event3 161 assert event2 != event3 162