1# Copyright 2020 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""This contains common retrying helpers (retryers).
15
16We use tenacity as a general-purpose retrying library.
17
18> It [tenacity] originates from a fork of retrying which is sadly no
19> longer maintained. Tenacity isn’t api compatible with retrying but >
20> adds significant new functionality and fixes a number of longstanding bugs.
21> - https://tenacity.readthedocs.io/en/latest/index.html
22"""
23import datetime
24import logging
25from typing import Any, Callable, List, Optional, Tuple, Type
26
27import tenacity
28from tenacity import _utils as tenacity_utils
29from tenacity import compat as tenacity_compat
30from tenacity import stop
31from tenacity import wait
32from tenacity.retry import retry_base
33
34retryers_logger = logging.getLogger(__name__)
35# Type aliases
36timedelta = datetime.timedelta
37Retrying = tenacity.Retrying
38CheckResultFn = Callable[[Any], bool]
39_ExceptionClasses = Tuple[Type[Exception], ...]
40
41
42def _build_retry_conditions(
43        *,
44        retry_on_exceptions: Optional[_ExceptionClasses] = None,
45        check_result: Optional[CheckResultFn] = None) -> List[retry_base]:
46
47    # Retry on all exceptions by default
48    if retry_on_exceptions is None:
49        retry_on_exceptions = (Exception,)
50
51    retry_conditions = [tenacity.retry_if_exception_type(retry_on_exceptions)]
52    if check_result is not None:
53        if retry_on_exceptions:
54            # When retry_on_exceptions is set, also catch them while executing
55            # check_result callback.
56            check_result = _safe_check_result(check_result, retry_on_exceptions)
57        retry_conditions.append(tenacity.retry_if_not_result(check_result))
58    return retry_conditions
59
60
61def exponential_retryer_with_timeout(
62        *,
63        wait_min: timedelta,
64        wait_max: timedelta,
65        timeout: timedelta,
66        retry_on_exceptions: Optional[_ExceptionClasses] = None,
67        check_result: Optional[CheckResultFn] = None,
68        logger: Optional[logging.Logger] = None,
69        log_level: Optional[int] = logging.DEBUG) -> Retrying:
70    if logger is None:
71        logger = retryers_logger
72    if log_level is None:
73        log_level = logging.DEBUG
74
75    retry_conditions = _build_retry_conditions(
76        retry_on_exceptions=retry_on_exceptions, check_result=check_result)
77    retry_error_callback = _on_error_callback(timeout=timeout,
78                                              check_result=check_result)
79    return Retrying(retry=tenacity.retry_any(*retry_conditions),
80                    wait=wait.wait_exponential(min=wait_min.total_seconds(),
81                                               max=wait_max.total_seconds()),
82                    stop=stop.stop_after_delay(timeout.total_seconds()),
83                    before_sleep=_before_sleep_log(logger, log_level),
84                    retry_error_callback=retry_error_callback)
85
86
87def constant_retryer(*,
88                     wait_fixed: timedelta,
89                     attempts: int = 0,
90                     timeout: Optional[timedelta] = None,
91                     retry_on_exceptions: Optional[_ExceptionClasses] = None,
92                     check_result: Optional[CheckResultFn] = None,
93                     logger: Optional[logging.Logger] = None,
94                     log_level: Optional[int] = logging.DEBUG) -> Retrying:
95    if logger is None:
96        logger = retryers_logger
97    if log_level is None:
98        log_level = logging.DEBUG
99    if attempts < 1 and timeout is None:
100        raise ValueError('The number of attempts or the timeout must be set')
101    stops = []
102    if attempts > 0:
103        stops.append(stop.stop_after_attempt(attempts))
104    if timeout is not None:
105        stops.append(stop.stop_after_delay(timeout.total_seconds()))
106
107    retry_conditions = _build_retry_conditions(
108        retry_on_exceptions=retry_on_exceptions, check_result=check_result)
109    retry_error_callback = _on_error_callback(timeout=timeout,
110                                              attempts=attempts,
111                                              check_result=check_result)
112    return Retrying(retry=tenacity.retry_any(*retry_conditions),
113                    wait=wait.wait_fixed(wait_fixed.total_seconds()),
114                    stop=stop.stop_any(*stops),
115                    before_sleep=_before_sleep_log(logger, log_level),
116                    retry_error_callback=retry_error_callback)
117
118
119def _on_error_callback(*,
120                       timeout: Optional[timedelta] = None,
121                       attempts: int = 0,
122                       check_result: Optional[CheckResultFn] = None):
123    """A helper to propagate the initial state to the RetryError, so that
124    it can assemble a helpful message containing timeout/number of attempts.
125    """
126
127    def error_handler(retry_state: tenacity.RetryCallState):
128        raise RetryError(retry_state,
129                         timeout=timeout,
130                         attempts=attempts,
131                         check_result=check_result)
132
133    return error_handler
134
135
136def _safe_check_result(check_result: CheckResultFn,
137                       retry_on_exceptions: _ExceptionClasses) -> CheckResultFn:
138    """Wraps check_result callback to catch and handle retry_on_exceptions.
139
140    Normally tenacity doesn't retry when retry_if_result/retry_if_not_result
141    raise an error. This wraps the callback to automatically catch Exceptions
142    specified in the retry_on_exceptions argument.
143
144    Ideally we should make all check_result callbacks to not throw, but
145    in case it does, we'd rather be annoying in the logs, than break the test.
146    """
147
148    def _check_result_wrapped(result):
149        try:
150            return check_result(result)
151        except retry_on_exceptions:
152            retryers_logger.warning(
153                "Result check callback %s raised an exception."
154                "This shouldn't happen, please handle any exceptions and "
155                "return return a boolean.",
156                tenacity_utils.get_callback_name(check_result),
157                exc_info=True)
158            return False
159
160    return _check_result_wrapped
161
162
163def _before_sleep_log(logger, log_level, exc_info=False):
164    """Same as tenacity.before_sleep_log, but only logs primitive return values.
165    This is not useful when the return value is a dump of a large object.
166    """
167
168    def log_it(retry_state):
169        if retry_state.outcome.failed:
170            ex = retry_state.outcome.exception()
171            verb, value = 'raised', '%s: %s' % (type(ex).__name__, ex)
172
173            if exc_info:
174                local_exc_info = tenacity_compat.get_exc_info_from_future(
175                    retry_state.outcome)
176            else:
177                local_exc_info = False
178        else:
179            local_exc_info = False  # exc_info does not apply when no exception
180            result = retry_state.outcome.result()
181            if isinstance(result, (int, bool, str)):
182                verb, value = 'returned', result
183            else:
184                verb, value = 'returned type', type(result)
185
186        logger.log(log_level,
187                   "Retrying %s in %s seconds as it %s %s.",
188                   tenacity_utils.get_callback_name(retry_state.fn),
189                   getattr(retry_state.next_action, 'sleep'),
190                   verb,
191                   value,
192                   exc_info=local_exc_info)
193
194    return log_it
195
196
197class RetryError(tenacity.RetryError):
198
199    def __init__(self,
200                 retry_state,
201                 *,
202                 timeout: Optional[timedelta] = None,
203                 attempts: int = 0,
204                 check_result: Optional[CheckResultFn] = None):
205        super().__init__(retry_state.outcome)
206        callback_name = tenacity_utils.get_callback_name(retry_state.fn)
207        self.message = f'Retry error calling {callback_name}:'
208        if timeout:
209            self.message += f' timeout {timeout} (h:mm:ss) exceeded'
210            if attempts:
211                self.message += ' or'
212        if attempts:
213            self.message += f' {attempts} attempts exhausted'
214
215        self.message += '.'
216
217        if retry_state.outcome.failed:
218            ex = retry_state.outcome.exception()
219            self.message += f' Last exception: {type(ex).__name__}: {ex}'
220        elif check_result:
221            self.message += ' Check result callback returned False.'
222
223    def result(self, *, default=None):
224        return default if self.last_attempt.failed else self.last_attempt.result(
225        )
226
227    def __str__(self):
228        return self.message
229