1# Copyright 2020 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""This contains common retrying helpers (retryers). 15 16We use tenacity as a general-purpose retrying library. 17 18> It [tenacity] originates from a fork of retrying which is sadly no 19> longer maintained. Tenacity isn’t api compatible with retrying but > 20> adds significant new functionality and fixes a number of longstanding bugs. 21> - https://tenacity.readthedocs.io/en/latest/index.html 22""" 23import datetime 24import logging 25from typing import Any, Callable, List, Optional, Tuple, Type 26 27import tenacity 28from tenacity import _utils as tenacity_utils 29from tenacity import compat as tenacity_compat 30from tenacity import stop 31from tenacity import wait 32from tenacity.retry import retry_base 33 34retryers_logger = logging.getLogger(__name__) 35# Type aliases 36timedelta = datetime.timedelta 37Retrying = tenacity.Retrying 38CheckResultFn = Callable[[Any], bool] 39_ExceptionClasses = Tuple[Type[Exception], ...] 40 41 42def _build_retry_conditions( 43 *, 44 retry_on_exceptions: Optional[_ExceptionClasses] = None, 45 check_result: Optional[CheckResultFn] = None) -> List[retry_base]: 46 47 # Retry on all exceptions by default 48 if retry_on_exceptions is None: 49 retry_on_exceptions = (Exception,) 50 51 retry_conditions = [tenacity.retry_if_exception_type(retry_on_exceptions)] 52 if check_result is not None: 53 if retry_on_exceptions: 54 # When retry_on_exceptions is set, also catch them while executing 55 # check_result callback. 56 check_result = _safe_check_result(check_result, retry_on_exceptions) 57 retry_conditions.append(tenacity.retry_if_not_result(check_result)) 58 return retry_conditions 59 60 61def exponential_retryer_with_timeout( 62 *, 63 wait_min: timedelta, 64 wait_max: timedelta, 65 timeout: timedelta, 66 retry_on_exceptions: Optional[_ExceptionClasses] = None, 67 check_result: Optional[CheckResultFn] = None, 68 logger: Optional[logging.Logger] = None, 69 log_level: Optional[int] = logging.DEBUG) -> Retrying: 70 if logger is None: 71 logger = retryers_logger 72 if log_level is None: 73 log_level = logging.DEBUG 74 75 retry_conditions = _build_retry_conditions( 76 retry_on_exceptions=retry_on_exceptions, check_result=check_result) 77 retry_error_callback = _on_error_callback(timeout=timeout, 78 check_result=check_result) 79 return Retrying(retry=tenacity.retry_any(*retry_conditions), 80 wait=wait.wait_exponential(min=wait_min.total_seconds(), 81 max=wait_max.total_seconds()), 82 stop=stop.stop_after_delay(timeout.total_seconds()), 83 before_sleep=_before_sleep_log(logger, log_level), 84 retry_error_callback=retry_error_callback) 85 86 87def constant_retryer(*, 88 wait_fixed: timedelta, 89 attempts: int = 0, 90 timeout: Optional[timedelta] = None, 91 retry_on_exceptions: Optional[_ExceptionClasses] = None, 92 check_result: Optional[CheckResultFn] = None, 93 logger: Optional[logging.Logger] = None, 94 log_level: Optional[int] = logging.DEBUG) -> Retrying: 95 if logger is None: 96 logger = retryers_logger 97 if log_level is None: 98 log_level = logging.DEBUG 99 if attempts < 1 and timeout is None: 100 raise ValueError('The number of attempts or the timeout must be set') 101 stops = [] 102 if attempts > 0: 103 stops.append(stop.stop_after_attempt(attempts)) 104 if timeout is not None: 105 stops.append(stop.stop_after_delay(timeout.total_seconds())) 106 107 retry_conditions = _build_retry_conditions( 108 retry_on_exceptions=retry_on_exceptions, check_result=check_result) 109 retry_error_callback = _on_error_callback(timeout=timeout, 110 attempts=attempts, 111 check_result=check_result) 112 return Retrying(retry=tenacity.retry_any(*retry_conditions), 113 wait=wait.wait_fixed(wait_fixed.total_seconds()), 114 stop=stop.stop_any(*stops), 115 before_sleep=_before_sleep_log(logger, log_level), 116 retry_error_callback=retry_error_callback) 117 118 119def _on_error_callback(*, 120 timeout: Optional[timedelta] = None, 121 attempts: int = 0, 122 check_result: Optional[CheckResultFn] = None): 123 """A helper to propagate the initial state to the RetryError, so that 124 it can assemble a helpful message containing timeout/number of attempts. 125 """ 126 127 def error_handler(retry_state: tenacity.RetryCallState): 128 raise RetryError(retry_state, 129 timeout=timeout, 130 attempts=attempts, 131 check_result=check_result) 132 133 return error_handler 134 135 136def _safe_check_result(check_result: CheckResultFn, 137 retry_on_exceptions: _ExceptionClasses) -> CheckResultFn: 138 """Wraps check_result callback to catch and handle retry_on_exceptions. 139 140 Normally tenacity doesn't retry when retry_if_result/retry_if_not_result 141 raise an error. This wraps the callback to automatically catch Exceptions 142 specified in the retry_on_exceptions argument. 143 144 Ideally we should make all check_result callbacks to not throw, but 145 in case it does, we'd rather be annoying in the logs, than break the test. 146 """ 147 148 def _check_result_wrapped(result): 149 try: 150 return check_result(result) 151 except retry_on_exceptions: 152 retryers_logger.warning( 153 "Result check callback %s raised an exception." 154 "This shouldn't happen, please handle any exceptions and " 155 "return return a boolean.", 156 tenacity_utils.get_callback_name(check_result), 157 exc_info=True) 158 return False 159 160 return _check_result_wrapped 161 162 163def _before_sleep_log(logger, log_level, exc_info=False): 164 """Same as tenacity.before_sleep_log, but only logs primitive return values. 165 This is not useful when the return value is a dump of a large object. 166 """ 167 168 def log_it(retry_state): 169 if retry_state.outcome.failed: 170 ex = retry_state.outcome.exception() 171 verb, value = 'raised', '%s: %s' % (type(ex).__name__, ex) 172 173 if exc_info: 174 local_exc_info = tenacity_compat.get_exc_info_from_future( 175 retry_state.outcome) 176 else: 177 local_exc_info = False 178 else: 179 local_exc_info = False # exc_info does not apply when no exception 180 result = retry_state.outcome.result() 181 if isinstance(result, (int, bool, str)): 182 verb, value = 'returned', result 183 else: 184 verb, value = 'returned type', type(result) 185 186 logger.log(log_level, 187 "Retrying %s in %s seconds as it %s %s.", 188 tenacity_utils.get_callback_name(retry_state.fn), 189 getattr(retry_state.next_action, 'sleep'), 190 verb, 191 value, 192 exc_info=local_exc_info) 193 194 return log_it 195 196 197class RetryError(tenacity.RetryError): 198 199 def __init__(self, 200 retry_state, 201 *, 202 timeout: Optional[timedelta] = None, 203 attempts: int = 0, 204 check_result: Optional[CheckResultFn] = None): 205 super().__init__(retry_state.outcome) 206 callback_name = tenacity_utils.get_callback_name(retry_state.fn) 207 self.message = f'Retry error calling {callback_name}:' 208 if timeout: 209 self.message += f' timeout {timeout} (h:mm:ss) exceeded' 210 if attempts: 211 self.message += ' or' 212 if attempts: 213 self.message += f' {attempts} attempts exhausted' 214 215 self.message += '.' 216 217 if retry_state.outcome.failed: 218 ex = retry_state.outcome.exception() 219 self.message += f' Last exception: {type(ex).__name__}: {ex}' 220 elif check_result: 221 self.message += ' Check result callback returned False.' 222 223 def result(self, *, default=None): 224 return default if self.last_attempt.failed else self.last_attempt.result( 225 ) 226 227 def __str__(self): 228 return self.message 229