1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Simple watchdog class.""" 15 16import threading 17from typing import Any, Callable 18 19 20class Watchdog: 21 """Simple class that times out unless reset. 22 23 This class could be used, for example, to track a device's connection state 24 for devices that send a periodic heartbeat packet. 25 """ 26 27 def __init__( 28 self, 29 on_reset: Callable[[], Any], 30 on_expiration: Callable[[], Any], 31 while_expired: Callable[[], Any] = lambda: None, 32 timeout_s: float = 1, 33 expired_timeout_s: float | None = None, 34 ): 35 """Creates a watchdog; start() must be called to start it. 36 37 Args: 38 on_reset: Function called when the watchdog is reset after having 39 expired. 40 on_expiration: Function called when the timeout expires. 41 while_expired: Function called repeatedly while the watchdog is 42 expired. 43 timeout_s: If reset() is not called for timeout_s, the watchdog 44 expires and calls the on_expiration callback. 45 expired_timeout_s: While expired, the watchdog calls the 46 while_expired callback every expired_timeout_s. 47 """ 48 self._on_reset = on_reset 49 self._on_expiration = on_expiration 50 self._while_expired = while_expired 51 52 self.timeout_s = timeout_s 53 54 if expired_timeout_s is None: 55 self.expired_timeout_s = self.timeout_s * 10 56 else: 57 self.expired_timeout_s = expired_timeout_s 58 59 self.expired: bool = False 60 self._watchdog = threading.Timer(0, self._timeout_expired) 61 62 def start(self) -> None: 63 """Starts the watchdog; must be called for the watchdog to work.""" 64 self._watchdog.cancel() 65 self._watchdog = threading.Timer( 66 self.expired_timeout_s if self.expired else self.timeout_s, 67 self._timeout_expired, 68 ) 69 self._watchdog.daemon = True 70 self._watchdog.start() 71 72 def stop(self) -> None: 73 """Stops the watchdog. 74 75 This will not trigger the execution of any callbacks and will prevent 76 further execution of any callbacks (including `while_expired`) until 77 `start` is called again. 78 """ 79 self._watchdog.cancel() 80 81 def reset(self) -> bool: 82 """Resets the timeout; calls the on_reset callback if expired. 83 84 Returns True if was expired. 85 """ 86 if self.expired: 87 self.expired = False 88 self._on_reset() 89 return True 90 91 self.start() 92 return False 93 94 def _timeout_expired(self) -> None: 95 if self.expired: 96 self._while_expired() 97 else: 98 self.expired = True 99 self._on_expiration() 100 101 self.start() 102