xref: /aosp_15_r20/external/pigweed/pw_rpc/py/pw_rpc/console_tools/watchdog.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Simple watchdog class."""
15
16import threading
17from typing import Any, Callable
18
19
20class Watchdog:
21    """Simple class that times out unless reset.
22
23    This class could be used, for example, to track a device's connection state
24    for devices that send a periodic heartbeat packet.
25    """
26
27    def __init__(
28        self,
29        on_reset: Callable[[], Any],
30        on_expiration: Callable[[], Any],
31        while_expired: Callable[[], Any] = lambda: None,
32        timeout_s: float = 1,
33        expired_timeout_s: float | None = None,
34    ):
35        """Creates a watchdog; start() must be called to start it.
36
37        Args:
38          on_reset: Function called when the watchdog is reset after having
39              expired.
40          on_expiration: Function called when the timeout expires.
41          while_expired: Function called repeatedly while the watchdog is
42              expired.
43          timeout_s: If reset() is not called for timeout_s, the watchdog
44              expires and calls the on_expiration callback.
45          expired_timeout_s: While expired, the watchdog calls the
46              while_expired callback every expired_timeout_s.
47        """
48        self._on_reset = on_reset
49        self._on_expiration = on_expiration
50        self._while_expired = while_expired
51
52        self.timeout_s = timeout_s
53
54        if expired_timeout_s is None:
55            self.expired_timeout_s = self.timeout_s * 10
56        else:
57            self.expired_timeout_s = expired_timeout_s
58
59        self.expired: bool = False
60        self._watchdog = threading.Timer(0, self._timeout_expired)
61
62    def start(self) -> None:
63        """Starts the watchdog; must be called for the watchdog to work."""
64        self._watchdog.cancel()
65        self._watchdog = threading.Timer(
66            self.expired_timeout_s if self.expired else self.timeout_s,
67            self._timeout_expired,
68        )
69        self._watchdog.daemon = True
70        self._watchdog.start()
71
72    def stop(self) -> None:
73        """Stops the watchdog.
74
75        This will not trigger the execution of any callbacks and will prevent
76        further execution of any callbacks (including `while_expired`) until
77        `start` is called again.
78        """
79        self._watchdog.cancel()
80
81    def reset(self) -> bool:
82        """Resets the timeout; calls the on_reset callback if expired.
83
84        Returns True if was expired.
85        """
86        if self.expired:
87            self.expired = False
88            self._on_reset()
89            return True
90
91        self.start()
92        return False
93
94    def _timeout_expired(self) -> None:
95        if self.expired:
96            self._while_expired()
97        else:
98            self.expired = True
99            self._on_expiration()
100
101        self.start()
102