1""":module: watchdog.tricks
2:synopsis: Utility event handlers.
3:author: [email protected] (Yesudeep Mangalapilly)
4:author: [email protected] (Mickaël Schoentgen)
5
6Classes
7-------
8.. autoclass:: Trick
9   :members:
10   :show-inheritance:
11
12.. autoclass:: LoggerTrick
13   :members:
14   :show-inheritance:
15
16.. autoclass:: ShellCommandTrick
17   :members:
18   :show-inheritance:
19
20.. autoclass:: AutoRestartTrick
21   :members:
22   :show-inheritance:
23
24"""
25
26from __future__ import annotations
27
28import contextlib
29import functools
30import logging
31import os
32import signal
33import subprocess
34import threading
35import time
36
37from watchdog.events import EVENT_TYPE_CLOSED_NO_WRITE, EVENT_TYPE_OPENED, FileSystemEvent, PatternMatchingEventHandler
38from watchdog.utils import echo, platform
39from watchdog.utils.event_debouncer import EventDebouncer
40from watchdog.utils.process_watcher import ProcessWatcher
41
42logger = logging.getLogger(__name__)
43echo_events = functools.partial(echo.echo, write=lambda msg: logger.info(msg))
44
45
46class Trick(PatternMatchingEventHandler):
47    """Your tricks should subclass this class."""
48
49    @classmethod
50    def generate_yaml(cls) -> str:
51        return f"""- {cls.__module__}.{cls.__name__}:
52  args:
53  - argument1
54  - argument2
55  kwargs:
56    patterns:
57    - "*.py"
58    - "*.js"
59    ignore_patterns:
60    - "version.py"
61    ignore_directories: false
62"""
63
64
65class LoggerTrick(Trick):
66    """A simple trick that does only logs events."""
67
68    @echo_events
69    def on_any_event(self, event: FileSystemEvent) -> None:
70        pass
71
72
73class ShellCommandTrick(Trick):
74    """Executes shell commands in response to matched events."""
75
76    def __init__(
77        self,
78        shell_command: str,
79        *,
80        patterns: list[str] | None = None,
81        ignore_patterns: list[str] | None = None,
82        ignore_directories: bool = False,
83        wait_for_process: bool = False,
84        drop_during_process: bool = False,
85    ):
86        super().__init__(
87            patterns=patterns,
88            ignore_patterns=ignore_patterns,
89            ignore_directories=ignore_directories,
90        )
91        self.shell_command = shell_command
92        self.wait_for_process = wait_for_process
93        self.drop_during_process = drop_during_process
94
95        self.process: subprocess.Popen[bytes] | None = None
96        self._process_watchers: set[ProcessWatcher] = set()
97
98    def on_any_event(self, event: FileSystemEvent) -> None:
99        if event.event_type in {EVENT_TYPE_OPENED, EVENT_TYPE_CLOSED_NO_WRITE}:
100            # FIXME: see issue #949, and find a way to better handle that scenario
101            return
102
103        from string import Template
104
105        if self.drop_during_process and self.is_process_running():
106            return
107
108        object_type = "directory" if event.is_directory else "file"
109        context = {
110            "watch_src_path": event.src_path,
111            "watch_dest_path": "",
112            "watch_event_type": event.event_type,
113            "watch_object": object_type,
114        }
115
116        if self.shell_command is None:
117            if hasattr(event, "dest_path"):
118                context["dest_path"] = event.dest_path
119                command = 'echo "${watch_event_type} ${watch_object} from ${watch_src_path} to ${watch_dest_path}"'
120            else:
121                command = 'echo "${watch_event_type} ${watch_object} ${watch_src_path}"'
122        else:
123            if hasattr(event, "dest_path"):
124                context["watch_dest_path"] = event.dest_path
125            command = self.shell_command
126
127        command = Template(command).safe_substitute(**context)
128        self.process = subprocess.Popen(command, shell=True)
129        if self.wait_for_process:
130            self.process.wait()
131        else:
132            process_watcher = ProcessWatcher(self.process, None)
133            self._process_watchers.add(process_watcher)
134            process_watcher.process_termination_callback = functools.partial(
135                self._process_watchers.discard,
136                process_watcher,
137            )
138            process_watcher.start()
139
140    def is_process_running(self) -> bool:
141        return bool(self._process_watchers or (self.process is not None and self.process.poll() is None))
142
143
144class AutoRestartTrick(Trick):
145    """Starts a long-running subprocess and restarts it on matched events.
146
147    The command parameter is a list of command arguments, such as
148    `['bin/myserver', '-c', 'etc/myconfig.ini']`.
149
150    Call `start()` after creating the Trick. Call `stop()` when stopping
151    the process.
152    """
153
154    def __init__(
155        self,
156        command: list[str],
157        *,
158        patterns: list[str] | None = None,
159        ignore_patterns: list[str] | None = None,
160        ignore_directories: bool = False,
161        stop_signal: signal.Signals = signal.SIGINT,
162        kill_after: int = 10,
163        debounce_interval_seconds: int = 0,
164        restart_on_command_exit: bool = True,
165    ):
166        if kill_after < 0:
167            error = "kill_after must be non-negative."
168            raise ValueError(error)
169        if debounce_interval_seconds < 0:
170            error = "debounce_interval_seconds must be non-negative."
171            raise ValueError(error)
172
173        super().__init__(
174            patterns=patterns,
175            ignore_patterns=ignore_patterns,
176            ignore_directories=ignore_directories,
177        )
178
179        self.command = command
180        self.stop_signal = stop_signal
181        self.kill_after = kill_after
182        self.debounce_interval_seconds = debounce_interval_seconds
183        self.restart_on_command_exit = restart_on_command_exit
184
185        self.process: subprocess.Popen[bytes] | None = None
186        self.process_watcher: ProcessWatcher | None = None
187        self.event_debouncer: EventDebouncer | None = None
188        self.restart_count = 0
189
190        self._is_process_stopping = False
191        self._is_trick_stopping = False
192        self._stopping_lock = threading.RLock()
193
194    def start(self) -> None:
195        if self.debounce_interval_seconds:
196            self.event_debouncer = EventDebouncer(
197                debounce_interval_seconds=self.debounce_interval_seconds,
198                events_callback=lambda events: self._restart_process(),
199            )
200            self.event_debouncer.start()
201        self._start_process()
202
203    def stop(self) -> None:
204        # Ensure the body of the function is only run once.
205        with self._stopping_lock:
206            if self._is_trick_stopping:
207                return
208            self._is_trick_stopping = True
209
210        process_watcher = self.process_watcher
211        if self.event_debouncer is not None:
212            self.event_debouncer.stop()
213        self._stop_process()
214
215        # Don't leak threads: Wait for background threads to stop.
216        if self.event_debouncer is not None:
217            self.event_debouncer.join()
218        if process_watcher is not None:
219            process_watcher.join()
220
221    def _start_process(self) -> None:
222        if self._is_trick_stopping:
223            return
224
225        # windows doesn't have setsid
226        self.process = subprocess.Popen(self.command, preexec_fn=getattr(os, "setsid", None))
227        if self.restart_on_command_exit:
228            self.process_watcher = ProcessWatcher(self.process, self._restart_process)
229            self.process_watcher.start()
230
231    def _stop_process(self) -> None:
232        # Ensure the body of the function is not run in parallel in different threads.
233        with self._stopping_lock:
234            if self._is_process_stopping:
235                return
236            self._is_process_stopping = True
237
238        try:
239            if self.process_watcher is not None:
240                self.process_watcher.stop()
241                self.process_watcher = None
242
243            if self.process is not None:
244                try:
245                    kill_process(self.process.pid, self.stop_signal)
246                except OSError:
247                    # Process is already gone
248                    pass
249                else:
250                    kill_time = time.time() + self.kill_after
251                    while time.time() < kill_time:
252                        if self.process.poll() is not None:
253                            break
254                        time.sleep(0.25)
255                    else:
256                        # Process is already gone
257                        with contextlib.suppress(OSError):
258                            kill_process(self.process.pid, 9)
259                self.process = None
260        finally:
261            self._is_process_stopping = False
262
263    @echo_events
264    def on_any_event(self, event: FileSystemEvent) -> None:
265        if event.event_type in {EVENT_TYPE_OPENED, EVENT_TYPE_CLOSED_NO_WRITE}:
266            # FIXME: see issue #949, and find a way to better handle that scenario
267            return
268
269        if self.event_debouncer is not None:
270            self.event_debouncer.handle_event(event)
271        else:
272            self._restart_process()
273
274    def _restart_process(self) -> None:
275        if self._is_trick_stopping:
276            return
277        self._stop_process()
278        self._start_process()
279        self.restart_count += 1
280
281
282if platform.is_windows():
283
284    def kill_process(pid: int, stop_signal: int) -> None:
285        os.kill(pid, stop_signal)
286
287else:
288
289    def kill_process(pid: int, stop_signal: int) -> None:
290        os.killpg(os.getpgid(pid), stop_signal)
291