1""":module: watchdog.tricks 2:synopsis: Utility event handlers. 3:author: [email protected] (Yesudeep Mangalapilly) 4:author: [email protected] (Mickaël Schoentgen) 5 6Classes 7------- 8.. autoclass:: Trick 9 :members: 10 :show-inheritance: 11 12.. autoclass:: LoggerTrick 13 :members: 14 :show-inheritance: 15 16.. autoclass:: ShellCommandTrick 17 :members: 18 :show-inheritance: 19 20.. autoclass:: AutoRestartTrick 21 :members: 22 :show-inheritance: 23 24""" 25 26from __future__ import annotations 27 28import contextlib 29import functools 30import logging 31import os 32import signal 33import subprocess 34import threading 35import time 36 37from watchdog.events import EVENT_TYPE_CLOSED_NO_WRITE, EVENT_TYPE_OPENED, FileSystemEvent, PatternMatchingEventHandler 38from watchdog.utils import echo, platform 39from watchdog.utils.event_debouncer import EventDebouncer 40from watchdog.utils.process_watcher import ProcessWatcher 41 42logger = logging.getLogger(__name__) 43echo_events = functools.partial(echo.echo, write=lambda msg: logger.info(msg)) 44 45 46class Trick(PatternMatchingEventHandler): 47 """Your tricks should subclass this class.""" 48 49 @classmethod 50 def generate_yaml(cls) -> str: 51 return f"""- {cls.__module__}.{cls.__name__}: 52 args: 53 - argument1 54 - argument2 55 kwargs: 56 patterns: 57 - "*.py" 58 - "*.js" 59 ignore_patterns: 60 - "version.py" 61 ignore_directories: false 62""" 63 64 65class LoggerTrick(Trick): 66 """A simple trick that does only logs events.""" 67 68 @echo_events 69 def on_any_event(self, event: FileSystemEvent) -> None: 70 pass 71 72 73class ShellCommandTrick(Trick): 74 """Executes shell commands in response to matched events.""" 75 76 def __init__( 77 self, 78 shell_command: str, 79 *, 80 patterns: list[str] | None = None, 81 ignore_patterns: list[str] | None = None, 82 ignore_directories: bool = False, 83 wait_for_process: bool = False, 84 drop_during_process: bool = False, 85 ): 86 super().__init__( 87 patterns=patterns, 88 ignore_patterns=ignore_patterns, 89 ignore_directories=ignore_directories, 90 ) 91 self.shell_command = shell_command 92 self.wait_for_process = wait_for_process 93 self.drop_during_process = drop_during_process 94 95 self.process: subprocess.Popen[bytes] | None = None 96 self._process_watchers: set[ProcessWatcher] = set() 97 98 def on_any_event(self, event: FileSystemEvent) -> None: 99 if event.event_type in {EVENT_TYPE_OPENED, EVENT_TYPE_CLOSED_NO_WRITE}: 100 # FIXME: see issue #949, and find a way to better handle that scenario 101 return 102 103 from string import Template 104 105 if self.drop_during_process and self.is_process_running(): 106 return 107 108 object_type = "directory" if event.is_directory else "file" 109 context = { 110 "watch_src_path": event.src_path, 111 "watch_dest_path": "", 112 "watch_event_type": event.event_type, 113 "watch_object": object_type, 114 } 115 116 if self.shell_command is None: 117 if hasattr(event, "dest_path"): 118 context["dest_path"] = event.dest_path 119 command = 'echo "${watch_event_type} ${watch_object} from ${watch_src_path} to ${watch_dest_path}"' 120 else: 121 command = 'echo "${watch_event_type} ${watch_object} ${watch_src_path}"' 122 else: 123 if hasattr(event, "dest_path"): 124 context["watch_dest_path"] = event.dest_path 125 command = self.shell_command 126 127 command = Template(command).safe_substitute(**context) 128 self.process = subprocess.Popen(command, shell=True) 129 if self.wait_for_process: 130 self.process.wait() 131 else: 132 process_watcher = ProcessWatcher(self.process, None) 133 self._process_watchers.add(process_watcher) 134 process_watcher.process_termination_callback = functools.partial( 135 self._process_watchers.discard, 136 process_watcher, 137 ) 138 process_watcher.start() 139 140 def is_process_running(self) -> bool: 141 return bool(self._process_watchers or (self.process is not None and self.process.poll() is None)) 142 143 144class AutoRestartTrick(Trick): 145 """Starts a long-running subprocess and restarts it on matched events. 146 147 The command parameter is a list of command arguments, such as 148 `['bin/myserver', '-c', 'etc/myconfig.ini']`. 149 150 Call `start()` after creating the Trick. Call `stop()` when stopping 151 the process. 152 """ 153 154 def __init__( 155 self, 156 command: list[str], 157 *, 158 patterns: list[str] | None = None, 159 ignore_patterns: list[str] | None = None, 160 ignore_directories: bool = False, 161 stop_signal: signal.Signals = signal.SIGINT, 162 kill_after: int = 10, 163 debounce_interval_seconds: int = 0, 164 restart_on_command_exit: bool = True, 165 ): 166 if kill_after < 0: 167 error = "kill_after must be non-negative." 168 raise ValueError(error) 169 if debounce_interval_seconds < 0: 170 error = "debounce_interval_seconds must be non-negative." 171 raise ValueError(error) 172 173 super().__init__( 174 patterns=patterns, 175 ignore_patterns=ignore_patterns, 176 ignore_directories=ignore_directories, 177 ) 178 179 self.command = command 180 self.stop_signal = stop_signal 181 self.kill_after = kill_after 182 self.debounce_interval_seconds = debounce_interval_seconds 183 self.restart_on_command_exit = restart_on_command_exit 184 185 self.process: subprocess.Popen[bytes] | None = None 186 self.process_watcher: ProcessWatcher | None = None 187 self.event_debouncer: EventDebouncer | None = None 188 self.restart_count = 0 189 190 self._is_process_stopping = False 191 self._is_trick_stopping = False 192 self._stopping_lock = threading.RLock() 193 194 def start(self) -> None: 195 if self.debounce_interval_seconds: 196 self.event_debouncer = EventDebouncer( 197 debounce_interval_seconds=self.debounce_interval_seconds, 198 events_callback=lambda events: self._restart_process(), 199 ) 200 self.event_debouncer.start() 201 self._start_process() 202 203 def stop(self) -> None: 204 # Ensure the body of the function is only run once. 205 with self._stopping_lock: 206 if self._is_trick_stopping: 207 return 208 self._is_trick_stopping = True 209 210 process_watcher = self.process_watcher 211 if self.event_debouncer is not None: 212 self.event_debouncer.stop() 213 self._stop_process() 214 215 # Don't leak threads: Wait for background threads to stop. 216 if self.event_debouncer is not None: 217 self.event_debouncer.join() 218 if process_watcher is not None: 219 process_watcher.join() 220 221 def _start_process(self) -> None: 222 if self._is_trick_stopping: 223 return 224 225 # windows doesn't have setsid 226 self.process = subprocess.Popen(self.command, preexec_fn=getattr(os, "setsid", None)) 227 if self.restart_on_command_exit: 228 self.process_watcher = ProcessWatcher(self.process, self._restart_process) 229 self.process_watcher.start() 230 231 def _stop_process(self) -> None: 232 # Ensure the body of the function is not run in parallel in different threads. 233 with self._stopping_lock: 234 if self._is_process_stopping: 235 return 236 self._is_process_stopping = True 237 238 try: 239 if self.process_watcher is not None: 240 self.process_watcher.stop() 241 self.process_watcher = None 242 243 if self.process is not None: 244 try: 245 kill_process(self.process.pid, self.stop_signal) 246 except OSError: 247 # Process is already gone 248 pass 249 else: 250 kill_time = time.time() + self.kill_after 251 while time.time() < kill_time: 252 if self.process.poll() is not None: 253 break 254 time.sleep(0.25) 255 else: 256 # Process is already gone 257 with contextlib.suppress(OSError): 258 kill_process(self.process.pid, 9) 259 self.process = None 260 finally: 261 self._is_process_stopping = False 262 263 @echo_events 264 def on_any_event(self, event: FileSystemEvent) -> None: 265 if event.event_type in {EVENT_TYPE_OPENED, EVENT_TYPE_CLOSED_NO_WRITE}: 266 # FIXME: see issue #949, and find a way to better handle that scenario 267 return 268 269 if self.event_debouncer is not None: 270 self.event_debouncer.handle_event(event) 271 else: 272 self._restart_process() 273 274 def _restart_process(self) -> None: 275 if self._is_trick_stopping: 276 return 277 self._stop_process() 278 self._start_process() 279 self.restart_count += 1 280 281 282if platform.is_windows(): 283 284 def kill_process(pid: int, stop_signal: int) -> None: 285 os.kill(pid, stop_signal) 286 287else: 288 289 def kill_process(pid: int, stop_signal: int) -> None: 290 os.killpg(os.getpgid(pid), stop_signal) 291