xref: /aosp_15_r20/external/autotest/server/cros/dynamic_suite/frontend_wrappers.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import math
7import threading
8
9from . import common
10from autotest_lib.client.common_lib import env
11from autotest_lib.client.common_lib import error
12from autotest_lib.client.common_lib import utils
13from autotest_lib.client.common_lib.cros import retry
14from autotest_lib.frontend.afe.json_rpc import proxy
15from autotest_lib.server import frontend
16try:
17    from autotest_lib.utils.frozen_chromite.lib import retry_util
18    from autotest_lib.utils.frozen_chromite.lib import timeout_util
19except ImportError as e:
20    logging.warning('Unable to import chromite: %s', e)
21    retry_util = None
22    timeout_util = None
23
24try:
25    from autotest_lib.utils.frozen_chromite.lib import metrics
26except ImportError as e:
27    logging.warning('Unable to import metrics from '
28                 'autotest_lib.utils.frozen_chromite: %s', e)
29    metrics = utils.metrics_mock
30
31
32def convert_timeout_to_retry(backoff, timeout_min, delay_sec):
33    """Compute the number of retry attempts for use with chromite.retry_util.
34
35    @param backoff: The exponential backoff factor.
36    @param timeout_min: The maximum amount of time (in minutes) to sleep.
37    @param delay_sec: The amount to sleep (in seconds) between each attempt.
38
39    @return: The number of retry attempts in the case of exponential backoff.
40    """
41    # Estimate the max_retry in the case of exponential backoff:
42    # => total_sleep = sleep*sum(r=0..max_retry-1, backoff^r)
43    # => total_sleep = sleep( (1-backoff^max_retry) / (1-backoff) )
44    # => max_retry*ln(backoff) = ln(1-(total_sleep/sleep)*(1-backoff))
45    # => max_retry = ln(1-(total_sleep/sleep)*(1-backoff))/ln(backoff)
46    total_sleep = timeout_min * 60
47    numerator = math.log10(1 - (total_sleep / delay_sec) * (1 - backoff))
48    denominator = math.log10(backoff)
49    return int(math.ceil(numerator / denominator))
50
51
52class RetryingAFE(frontend.AFE):
53    """Wrapper around frontend.AFE that retries all RPCs.
54
55    Timeout for retries and delay between retries are configurable.
56    """
57    def __init__(self, timeout_min=30, delay_sec=10, **dargs):
58        """Constructor
59
60        @param timeout_min: timeout in minutes until giving up.
61        @param delay_sec: pre-jittered delay between retries in seconds.
62        """
63        self.timeout_min = timeout_min
64        self.delay_sec = delay_sec
65        super(RetryingAFE, self).__init__(**dargs)
66
67
68    def set_timeout(self, timeout_min):
69        """Set timeout minutes for the AFE server.
70
71        @param timeout_min: The timeout minutes for AFE server.
72        """
73        self.timeout_min = timeout_min
74
75
76    def run(self, call, **dargs):
77        """Method for running RPC call.
78
79        @param call: A string RPC call.
80        @param dargs: the parameters of the RPC call.
81        """
82        if retry_util is None:
83            raise ImportError('Unable to import chromite. Please consider '
84                              'running build_externals to build site packages.')
85        # exc_retry: We retry if this exception is raised.
86        # raiselist: Exceptions that we raise immediately if caught.
87        exc_retry = Exception
88        raiselist = (ImportError, error.RPCException, proxy.JSONRPCException,
89                     timeout_util.TimeoutError, error.ControlFileNotFound)
90        backoff = 2
91        max_retry = convert_timeout_to_retry(backoff, self.timeout_min,
92                                             self.delay_sec)
93
94        def _run(self, call, **dargs):
95            return super(RetryingAFE, self).run(call, **dargs)
96
97        def handler(exc):
98            """Check if exc is an exc_retry or if it's in raiselist.
99
100            @param exc: An exception.
101
102            @return: True if exc is an exc_retry and is not
103                     in raiselist. False otherwise.
104            """
105            is_exc_to_check = isinstance(exc, exc_retry)
106            is_in_raiselist = isinstance(exc, raiselist)
107            return is_exc_to_check and not is_in_raiselist
108
109        # If the call is not in main thread, signal can't be used to abort the
110        # call. In that case, use a basic retry which does not enforce timeout
111        # if the process hangs.
112        @retry.retry(Exception, timeout_min=self.timeout_min,
113                     delay_sec=self.delay_sec,
114                     raiselist=[ImportError, error.RPCException,
115                                proxy.ValidationError])
116        def _run_in_child_thread(self, call, **dargs):
117            return super(RetryingAFE, self).run(call, **dargs)
118
119        if isinstance(threading.current_thread(), threading._MainThread):
120            # Set the keyword argument for GenericRetry
121            dargs['sleep'] = self.delay_sec
122            dargs['backoff_factor'] = backoff
123            # timeout_util.Timeout fundamentally relies on sigalrm, and doesn't
124            # work at all in wsgi environment (just emits logs spam). So, don't
125            # use it in wsgi.
126            try:
127                if env.IN_MOD_WSGI:
128                    return retry_util.GenericRetry(handler, max_retry, _run,
129                                                   self, call, **dargs)
130                with timeout_util.Timeout(self.timeout_min * 60):
131                    return retry_util.GenericRetry(handler, max_retry, _run,
132                                                   self, call, **dargs)
133            except timeout_util.TimeoutError:
134                c = metrics.Counter(
135                        'chromeos/autotest/retrying_afe/retry_timeout')
136                # Reserve field job_details for future use.
137                f = {'destination_server': self.server.split(':')[0],
138                     'call': call,
139                     'job_details': ''}
140                c.increment(fields=f)
141                raise
142        else:
143            return _run_in_child_thread(self, call, **dargs)
144
145
146class RetryingTKO(frontend.TKO):
147    """Wrapper around frontend.TKO that retries all RPCs.
148
149    Timeout for retries and delay between retries are configurable.
150    """
151    def __init__(self, timeout_min=30, delay_sec=10, **dargs):
152        """Constructor
153
154        @param timeout_min: timeout in minutes until giving up.
155        @param delay_sec: pre-jittered delay between retries in seconds.
156        """
157        self.timeout_min = timeout_min
158        self.delay_sec = delay_sec
159        super(RetryingTKO, self).__init__(**dargs)
160
161
162    def run(self, call, **dargs):
163        """Method for running RPC call.
164
165        @param call: A string RPC call.
166        @param dargs: the parameters of the RPC call.
167        """
168        @retry.retry(Exception, timeout_min=self.timeout_min,
169                     delay_sec=self.delay_sec,
170                     raiselist=[ImportError, error.RPCException,
171                                proxy.ValidationError])
172        def _run(self, call, **dargs):
173            return super(RetryingTKO, self).run(call, **dargs)
174        return _run(self, call, **dargs)
175