1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import math 7import threading 8 9from . import common 10from autotest_lib.client.common_lib import env 11from autotest_lib.client.common_lib import error 12from autotest_lib.client.common_lib import utils 13from autotest_lib.client.common_lib.cros import retry 14from autotest_lib.frontend.afe.json_rpc import proxy 15from autotest_lib.server import frontend 16try: 17 from autotest_lib.utils.frozen_chromite.lib import retry_util 18 from autotest_lib.utils.frozen_chromite.lib import timeout_util 19except ImportError as e: 20 logging.warning('Unable to import chromite: %s', e) 21 retry_util = None 22 timeout_util = None 23 24try: 25 from autotest_lib.utils.frozen_chromite.lib import metrics 26except ImportError as e: 27 logging.warning('Unable to import metrics from ' 28 'autotest_lib.utils.frozen_chromite: %s', e) 29 metrics = utils.metrics_mock 30 31 32def convert_timeout_to_retry(backoff, timeout_min, delay_sec): 33 """Compute the number of retry attempts for use with chromite.retry_util. 34 35 @param backoff: The exponential backoff factor. 36 @param timeout_min: The maximum amount of time (in minutes) to sleep. 37 @param delay_sec: The amount to sleep (in seconds) between each attempt. 38 39 @return: The number of retry attempts in the case of exponential backoff. 40 """ 41 # Estimate the max_retry in the case of exponential backoff: 42 # => total_sleep = sleep*sum(r=0..max_retry-1, backoff^r) 43 # => total_sleep = sleep( (1-backoff^max_retry) / (1-backoff) ) 44 # => max_retry*ln(backoff) = ln(1-(total_sleep/sleep)*(1-backoff)) 45 # => max_retry = ln(1-(total_sleep/sleep)*(1-backoff))/ln(backoff) 46 total_sleep = timeout_min * 60 47 numerator = math.log10(1 - (total_sleep / delay_sec) * (1 - backoff)) 48 denominator = math.log10(backoff) 49 return int(math.ceil(numerator / denominator)) 50 51 52class RetryingAFE(frontend.AFE): 53 """Wrapper around frontend.AFE that retries all RPCs. 54 55 Timeout for retries and delay between retries are configurable. 56 """ 57 def __init__(self, timeout_min=30, delay_sec=10, **dargs): 58 """Constructor 59 60 @param timeout_min: timeout in minutes until giving up. 61 @param delay_sec: pre-jittered delay between retries in seconds. 62 """ 63 self.timeout_min = timeout_min 64 self.delay_sec = delay_sec 65 super(RetryingAFE, self).__init__(**dargs) 66 67 68 def set_timeout(self, timeout_min): 69 """Set timeout minutes for the AFE server. 70 71 @param timeout_min: The timeout minutes for AFE server. 72 """ 73 self.timeout_min = timeout_min 74 75 76 def run(self, call, **dargs): 77 """Method for running RPC call. 78 79 @param call: A string RPC call. 80 @param dargs: the parameters of the RPC call. 81 """ 82 if retry_util is None: 83 raise ImportError('Unable to import chromite. Please consider ' 84 'running build_externals to build site packages.') 85 # exc_retry: We retry if this exception is raised. 86 # raiselist: Exceptions that we raise immediately if caught. 87 exc_retry = Exception 88 raiselist = (ImportError, error.RPCException, proxy.JSONRPCException, 89 timeout_util.TimeoutError, error.ControlFileNotFound) 90 backoff = 2 91 max_retry = convert_timeout_to_retry(backoff, self.timeout_min, 92 self.delay_sec) 93 94 def _run(self, call, **dargs): 95 return super(RetryingAFE, self).run(call, **dargs) 96 97 def handler(exc): 98 """Check if exc is an exc_retry or if it's in raiselist. 99 100 @param exc: An exception. 101 102 @return: True if exc is an exc_retry and is not 103 in raiselist. False otherwise. 104 """ 105 is_exc_to_check = isinstance(exc, exc_retry) 106 is_in_raiselist = isinstance(exc, raiselist) 107 return is_exc_to_check and not is_in_raiselist 108 109 # If the call is not in main thread, signal can't be used to abort the 110 # call. In that case, use a basic retry which does not enforce timeout 111 # if the process hangs. 112 @retry.retry(Exception, timeout_min=self.timeout_min, 113 delay_sec=self.delay_sec, 114 raiselist=[ImportError, error.RPCException, 115 proxy.ValidationError]) 116 def _run_in_child_thread(self, call, **dargs): 117 return super(RetryingAFE, self).run(call, **dargs) 118 119 if isinstance(threading.current_thread(), threading._MainThread): 120 # Set the keyword argument for GenericRetry 121 dargs['sleep'] = self.delay_sec 122 dargs['backoff_factor'] = backoff 123 # timeout_util.Timeout fundamentally relies on sigalrm, and doesn't 124 # work at all in wsgi environment (just emits logs spam). So, don't 125 # use it in wsgi. 126 try: 127 if env.IN_MOD_WSGI: 128 return retry_util.GenericRetry(handler, max_retry, _run, 129 self, call, **dargs) 130 with timeout_util.Timeout(self.timeout_min * 60): 131 return retry_util.GenericRetry(handler, max_retry, _run, 132 self, call, **dargs) 133 except timeout_util.TimeoutError: 134 c = metrics.Counter( 135 'chromeos/autotest/retrying_afe/retry_timeout') 136 # Reserve field job_details for future use. 137 f = {'destination_server': self.server.split(':')[0], 138 'call': call, 139 'job_details': ''} 140 c.increment(fields=f) 141 raise 142 else: 143 return _run_in_child_thread(self, call, **dargs) 144 145 146class RetryingTKO(frontend.TKO): 147 """Wrapper around frontend.TKO that retries all RPCs. 148 149 Timeout for retries and delay between retries are configurable. 150 """ 151 def __init__(self, timeout_min=30, delay_sec=10, **dargs): 152 """Constructor 153 154 @param timeout_min: timeout in minutes until giving up. 155 @param delay_sec: pre-jittered delay between retries in seconds. 156 """ 157 self.timeout_min = timeout_min 158 self.delay_sec = delay_sec 159 super(RetryingTKO, self).__init__(**dargs) 160 161 162 def run(self, call, **dargs): 163 """Method for running RPC call. 164 165 @param call: A string RPC call. 166 @param dargs: the parameters of the RPC call. 167 """ 168 @retry.retry(Exception, timeout_min=self.timeout_min, 169 delay_sec=self.delay_sec, 170 raiselist=[ImportError, error.RPCException, 171 proxy.ValidationError]) 172 def _run(self, call, **dargs): 173 return super(RetryingTKO, self).run(call, **dargs) 174 return _run(self, call, **dargs) 175