1from __future__ import annotations
2
3import os
4import sys
5import time
6from unittest.mock import patch
7
8import pytest
9
10# Skip if import PyYAML failed. PyYAML missing possible because
11# watchdog installed without watchmedo. See Installation section
12# in README.rst
13yaml = pytest.importorskip("yaml")
14
15from yaml.constructor import ConstructorError  # noqa: E402
16from yaml.scanner import ScannerError  # noqa: E402
17
18from watchdog import watchmedo  # noqa: E402
19from watchdog.events import FileModifiedEvent, FileOpenedEvent  # noqa: E402
20from watchdog.tricks import AutoRestartTrick, ShellCommandTrick  # noqa: E402
21from watchdog.utils import WatchdogShutdownError, platform  # noqa: E402
22
23
24def test_load_config_valid(tmpdir):
25    """Verifies the load of a valid yaml file"""
26
27    yaml_file = os.path.join(tmpdir, "config_file.yaml")
28    with open(yaml_file, "w") as f:
29        f.write("one: value\ntwo:\n- value1\n- value2\n")
30
31    config = watchmedo.load_config(yaml_file)
32    assert isinstance(config, dict)
33    assert "one" in config
34    assert "two" in config
35    assert isinstance(config["two"], list)
36    assert config["one"] == "value"
37    assert config["two"] == ["value1", "value2"]
38
39
40def test_load_config_invalid(tmpdir):
41    """Verifies if safe load avoid the execution
42    of untrusted code inside yaml files"""
43
44    critical_dir = os.path.join(tmpdir, "critical")
45    yaml_file = os.path.join(tmpdir, "tricks_file.yaml")
46    with open(yaml_file, "w") as f:
47        content = f'one: value\nrun: !!python/object/apply:os.system ["mkdir {critical_dir}"]\n'
48        f.write(content)
49
50    # PyYAML get_single_data() raises different exceptions for Linux and Windows
51    with pytest.raises((ConstructorError, ScannerError)):
52        watchmedo.load_config(yaml_file)
53
54    assert not os.path.exists(critical_dir)
55
56
57def make_dummy_script(tmpdir, n=10):
58    script = os.path.join(tmpdir, f"auto-test-{n}.py")
59    with open(script, "w") as f:
60        f.write('import time\nfor i in range(%d):\n\tprint("+++++ %%d" %% i, flush=True)\n\ttime.sleep(1)\n' % n)
61    return script
62
63
64def test_kill_auto_restart(tmpdir, capfd):
65    script = make_dummy_script(tmpdir)
66    a = AutoRestartTrick([sys.executable, script])
67    a.start()
68    time.sleep(3)
69    a.stop()
70    cap = capfd.readouterr()
71    assert "+++++ 0" in cap.out
72    assert "+++++ 9" not in cap.out  # we killed the subprocess before the end
73    # in windows we seem to lose the subprocess stderr
74    # assert 'KeyboardInterrupt' in cap.err
75
76
77def test_shell_command_wait_for_completion(tmpdir, capfd):
78    script = make_dummy_script(tmpdir, n=1)
79    command = f"{sys.executable} {script}"
80    trick = ShellCommandTrick(command, wait_for_process=True)
81    assert not trick.is_process_running()
82    start_time = time.monotonic()
83    trick.on_any_event(FileModifiedEvent("foo/bar.baz"))
84    elapsed = time.monotonic() - start_time
85    assert not trick.is_process_running()
86    assert elapsed >= 1
87
88
89def test_shell_command_subprocess_termination_nowait(tmpdir):
90    script = make_dummy_script(tmpdir, n=1)
91    command = f"{sys.executable} {script}"
92    trick = ShellCommandTrick(command, wait_for_process=False)
93    assert not trick.is_process_running()
94    trick.on_any_event(FileModifiedEvent("foo/bar.baz"))
95    assert trick.is_process_running()
96    time.sleep(5)
97    assert not trick.is_process_running()
98
99
100def test_shell_command_subprocess_termination_not_happening_on_file_opened_event(
101    tmpdir,
102):
103    # FIXME: see issue #949, and find a way to better handle that scenario
104    script = make_dummy_script(tmpdir, n=1)
105    command = f"{sys.executable} {script}"
106    trick = ShellCommandTrick(command, wait_for_process=False)
107    assert not trick.is_process_running()
108    trick.on_any_event(FileOpenedEvent("foo/bar.baz"))
109    assert not trick.is_process_running()
110    time.sleep(5)
111    assert not trick.is_process_running()
112
113
114def test_auto_restart_not_happening_on_file_opened_event(tmpdir, capfd):
115    # FIXME: see issue #949, and find a way to better handle that scenario
116    script = make_dummy_script(tmpdir, n=2)
117    trick = AutoRestartTrick([sys.executable, script])
118    trick.start()
119    time.sleep(1)
120    trick.on_any_event(FileOpenedEvent("foo/bar.baz"))
121    trick.on_any_event(FileOpenedEvent("foo/bar2.baz"))
122    trick.on_any_event(FileOpenedEvent("foo/bar3.baz"))
123    time.sleep(1)
124    trick.stop()
125    cap = capfd.readouterr()
126    assert cap.out.splitlines(keepends=False).count("+++++ 0") == 1
127    assert trick.restart_count == 0
128
129
130def test_auto_restart_on_file_change(tmpdir, capfd):
131    """Simulate changing 3 files.
132
133    Expect 3 restarts.
134    """
135    script = make_dummy_script(tmpdir, n=2)
136    trick = AutoRestartTrick([sys.executable, script])
137    trick.start()
138    time.sleep(1)
139    trick.on_any_event(FileModifiedEvent("foo/bar.baz"))
140    trick.on_any_event(FileModifiedEvent("foo/bar2.baz"))
141    trick.on_any_event(FileModifiedEvent("foo/bar3.baz"))
142    time.sleep(1)
143    trick.stop()
144    cap = capfd.readouterr()
145    assert cap.out.splitlines(keepends=False).count("+++++ 0") >= 2
146    assert trick.restart_count == 3
147
148
149@pytest.mark.xfail(
150    condition=platform.is_darwin() or platform.is_windows() or sys.implementation.name == "pypy",
151    reason="known to be problematic, see #973",
152)
153def test_auto_restart_on_file_change_debounce(tmpdir, capfd):
154    """Simulate changing 3 files quickly and then another change later.
155
156    Expect 2 restarts due to debouncing.
157    """
158    script = make_dummy_script(tmpdir, n=2)
159    trick = AutoRestartTrick([sys.executable, script], debounce_interval_seconds=0.5)
160    trick.start()
161    time.sleep(1)
162    trick.on_any_event(FileModifiedEvent("foo/bar.baz"))
163    trick.on_any_event(FileModifiedEvent("foo/bar2.baz"))
164    time.sleep(0.1)
165    trick.on_any_event(FileModifiedEvent("foo/bar3.baz"))
166    time.sleep(1)
167    trick.on_any_event(FileModifiedEvent("foo/bar.baz"))
168    time.sleep(1)
169    trick.stop()
170    cap = capfd.readouterr()
171    assert cap.out.splitlines(keepends=False).count("+++++ 0") == 3
172    assert trick.restart_count == 2
173
174
175@pytest.mark.flaky(max_runs=5, min_passes=1)
176@pytest.mark.parametrize(
177    "restart_on_command_exit",
178    [
179        True,
180        pytest.param(
181            False,
182            marks=pytest.mark.xfail(
183                condition=platform.is_darwin() or platform.is_windows(),
184                reason="known to be problematic, see #972",
185            ),
186        ),
187    ],
188)
189def test_auto_restart_subprocess_termination(tmpdir, capfd, restart_on_command_exit):
190    """Run auto-restart with a script that terminates in about 2 seconds.
191
192    After 5 seconds, expect it to have been restarted at least once.
193    """
194    script = make_dummy_script(tmpdir, n=2)
195    trick = AutoRestartTrick([sys.executable, script], restart_on_command_exit=restart_on_command_exit)
196    trick.start()
197    time.sleep(5)
198    trick.stop()
199    cap = capfd.readouterr()
200    if restart_on_command_exit:
201        assert cap.out.splitlines(keepends=False).count("+++++ 0") > 1
202        assert trick.restart_count >= 1
203    else:
204        assert cap.out.splitlines(keepends=False).count("+++++ 0") == 1
205        assert trick.restart_count == 0
206
207
208def test_auto_restart_arg_parsing_basic():
209    args = watchmedo.cli.parse_args(["auto-restart", "-d", ".", "--recursive", "--debug-force-polling", "cmd"])
210    assert args.func is watchmedo.auto_restart
211    assert args.command == "cmd"
212    assert args.directories == ["."]
213    assert args.recursive
214    assert args.debug_force_polling
215
216
217def test_auto_restart_arg_parsing():
218    args = watchmedo.cli.parse_args(
219        [
220            "auto-restart",
221            "-d",
222            ".",
223            "--kill-after",
224            "12.5",
225            "--debounce-interval=0.2",
226            "cmd",
227        ]
228    )
229    assert args.func is watchmedo.auto_restart
230    assert args.command == "cmd"
231    assert args.directories == ["."]
232    assert args.kill_after == pytest.approx(12.5)
233    assert args.debounce_interval == pytest.approx(0.2)
234
235
236def test_shell_command_arg_parsing():
237    args = watchmedo.cli.parse_args(["shell-command", "--command='cmd'"])
238    assert args.command == "'cmd'"
239
240
241@pytest.mark.parametrize("cmdline", [["auto-restart", "-d", ".", "cmd"], ["log", "."]])
242@pytest.mark.parametrize(
243    "verbosity",
244    [
245        ([], "WARNING"),
246        (["-q"], "ERROR"),
247        (["--quiet"], "ERROR"),
248        (["-v"], "INFO"),
249        (["--verbose"], "INFO"),
250        (["-vv"], "DEBUG"),
251        (["-v", "-v"], "DEBUG"),
252        (["--verbose", "-v"], "DEBUG"),
253    ],
254)
255def test_valid_verbosity(cmdline, verbosity):
256    (verbosity_cmdline_args, expected_log_level) = verbosity
257    cmd = [cmdline[0], *verbosity_cmdline_args, *cmdline[1:]]
258    args = watchmedo.cli.parse_args(cmd)
259    log_level = watchmedo._get_log_level_from_args(args)  # noqa: SLF001
260    assert log_level == expected_log_level
261
262
263@pytest.mark.parametrize("cmdline", [["auto-restart", "-d", ".", "cmd"], ["log", "."]])
264@pytest.mark.parametrize(
265    "verbosity_cmdline_args",
266    [
267        ["-q", "-v"],
268        ["-v", "-q"],
269        ["-qq"],
270        ["-q", "-q"],
271        ["--quiet", "--quiet"],
272        ["--quiet", "-q"],
273        ["-vvv"],
274        ["-vvvv"],
275        ["-v", "-v", "-v"],
276        ["-vv", "-v"],
277        ["--verbose", "-vv"],
278    ],
279)
280def test_invalid_verbosity(cmdline, verbosity_cmdline_args):
281    cmd = [cmdline[0], *verbosity_cmdline_args, *cmdline[1:]]
282    with pytest.raises((watchmedo.LogLevelError, SystemExit)):  # noqa: PT012
283        args = watchmedo.cli.parse_args(cmd)
284        watchmedo._get_log_level_from_args(args)  # noqa: SLF001
285
286
287@pytest.mark.parametrize("command", ["tricks-from", "tricks"])
288def test_tricks_from_file(command, tmp_path):
289    tricks_file = tmp_path / "tricks.yaml"
290    tricks_file.write_text(
291        """
292tricks:
293- watchdog.tricks.LoggerTrick:
294    patterns: ["*.py", "*.js"]
295"""
296    )
297    args = watchmedo.cli.parse_args([command, str(tricks_file)])
298
299    checkpoint = False
300
301    def mocked_sleep(_):
302        nonlocal checkpoint
303        checkpoint = True
304        raise WatchdogShutdownError
305
306    with patch("time.sleep", mocked_sleep):
307        watchmedo.tricks_from(args)
308    assert checkpoint
309