1*9c5db199SXin Li# Lint as: python2, python3 2*9c5db199SXin Li# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be 4*9c5db199SXin Li# found in the LICENSE file. 5*9c5db199SXin Li 6*9c5db199SXin Lifrom __future__ import absolute_import 7*9c5db199SXin Lifrom __future__ import division 8*9c5db199SXin Lifrom __future__ import print_function 9*9c5db199SXin Liimport logging 10*9c5db199SXin Liimport sys 11*9c5db199SXin Liimport threading 12*9c5db199SXin Liimport time 13*9c5db199SXin Lifrom autotest_lib.client.common_lib import error 14*9c5db199SXin Liimport six 15*9c5db199SXin Lifrom six.moves import range 16*9c5db199SXin Li 17*9c5db199SXin Li 18*9c5db199SXin Liclass BaseStressor(threading.Thread): 19*9c5db199SXin Li """ 20*9c5db199SXin Li Implements common functionality for *Stressor classes. 21*9c5db199SXin Li 22*9c5db199SXin Li @var stressor: callable which performs a single stress event. 23*9c5db199SXin Li """ 24*9c5db199SXin Li def __init__(self, stressor, on_exit=None, escalate_exceptions=True): 25*9c5db199SXin Li """ 26*9c5db199SXin Li Initialize the ControlledStressor. 27*9c5db199SXin Li 28*9c5db199SXin Li @param stressor: callable which performs a single stress event. 29*9c5db199SXin Li @param on_exit: callable which will be called when the thread finishes. 30*9c5db199SXin Li @param escalate_exceptions: whether to escalate exceptions to the parent 31*9c5db199SXin Li thread; defaults to True. 32*9c5db199SXin Li """ 33*9c5db199SXin Li super(BaseStressor, self).__init__() 34*9c5db199SXin Li self.daemon = True 35*9c5db199SXin Li self.stressor = stressor 36*9c5db199SXin Li self.on_exit = on_exit 37*9c5db199SXin Li self._escalate_exceptions = escalate_exceptions 38*9c5db199SXin Li self._exc_info = None 39*9c5db199SXin Li 40*9c5db199SXin Li 41*9c5db199SXin Li def start(self, start_condition=None, start_timeout_secs=None): 42*9c5db199SXin Li """ 43*9c5db199SXin Li Creates a new thread which will call the run() method. 44*9c5db199SXin Li 45*9c5db199SXin Li Optionally takes a wait condition before the stressor loop. Returns 46*9c5db199SXin Li immediately. 47*9c5db199SXin Li 48*9c5db199SXin Li @param start_condition: the new thread will wait until this optional 49*9c5db199SXin Li callable returns True before running the stressor. 50*9c5db199SXin Li @param start_timeout_secs: how long to wait for |start_condition| to 51*9c5db199SXin Li become True, or None to wait forever. 52*9c5db199SXin Li """ 53*9c5db199SXin Li self._start_condition = start_condition 54*9c5db199SXin Li self._start_timeout_secs = start_timeout_secs 55*9c5db199SXin Li super(BaseStressor, self).start() 56*9c5db199SXin Li 57*9c5db199SXin Li 58*9c5db199SXin Li def run(self): 59*9c5db199SXin Li """ 60*9c5db199SXin Li Wait for |_start_condition|, and then start the stressor loop. 61*9c5db199SXin Li 62*9c5db199SXin Li Overloaded from threading.Thread. This is run in a separate thread when 63*9c5db199SXin Li start() is called. 64*9c5db199SXin Li """ 65*9c5db199SXin Li try: 66*9c5db199SXin Li self._wait_for_start_condition() 67*9c5db199SXin Li self._loop_stressor() 68*9c5db199SXin Li except Exception as e: 69*9c5db199SXin Li if self._escalate_exceptions: 70*9c5db199SXin Li self._exc_info = sys.exc_info() 71*9c5db199SXin Li raise # Terminates this thread. Caller continues to run. 72*9c5db199SXin Li finally: 73*9c5db199SXin Li if self.on_exit: 74*9c5db199SXin Li self.on_exit() 75*9c5db199SXin Li 76*9c5db199SXin Li 77*9c5db199SXin Li def _wait_for_start_condition(self): 78*9c5db199SXin Li """ 79*9c5db199SXin Li Loop until _start_condition() returns True, or _start_timeout_secs 80*9c5db199SXin Li have elapsed. 81*9c5db199SXin Li 82*9c5db199SXin Li @raise error.TestFail if we time out waiting for the start condition 83*9c5db199SXin Li """ 84*9c5db199SXin Li if self._start_condition is None: 85*9c5db199SXin Li return 86*9c5db199SXin Li 87*9c5db199SXin Li elapsed_secs = 0 88*9c5db199SXin Li while not self._start_condition(): 89*9c5db199SXin Li if (self._start_timeout_secs and 90*9c5db199SXin Li elapsed_secs >= self._start_timeout_secs): 91*9c5db199SXin Li raise error.TestFail('start condition did not become true ' 92*9c5db199SXin Li 'within %d seconds' % 93*9c5db199SXin Li self._start_timeout_secs) 94*9c5db199SXin Li time.sleep(1) 95*9c5db199SXin Li elapsed_secs += 1 96*9c5db199SXin Li 97*9c5db199SXin Li 98*9c5db199SXin Li def _loop_stressor(self): 99*9c5db199SXin Li """ 100*9c5db199SXin Li Apply stressor in a loop. 101*9c5db199SXin Li 102*9c5db199SXin Li Overloaded by the particular *Stressor. 103*9c5db199SXin Li """ 104*9c5db199SXin Li raise NotImplementedError 105*9c5db199SXin Li 106*9c5db199SXin Li 107*9c5db199SXin Li def reraise(self): 108*9c5db199SXin Li """ 109*9c5db199SXin Li Reraise an exception raised in the thread's stress loop. 110*9c5db199SXin Li 111*9c5db199SXin Li This is a No-op if no exception was raised. 112*9c5db199SXin Li """ 113*9c5db199SXin Li if self._exc_info: 114*9c5db199SXin Li exc_info = self._exc_info 115*9c5db199SXin Li self._exc_info = None 116*9c5db199SXin Li six.reraise(exc_info[0], exc_info[1], exc_info[2]) 117*9c5db199SXin Li 118*9c5db199SXin Li 119*9c5db199SXin Liclass ControlledStressor(BaseStressor): 120*9c5db199SXin Li """ 121*9c5db199SXin Li Run a stressor in loop on a separate thread. 122*9c5db199SXin Li 123*9c5db199SXin Li Creates a new thread and calls |stressor| in a loop until stop() is called. 124*9c5db199SXin Li """ 125*9c5db199SXin Li def __init__(self, stressor, on_exit=None, escalate_exceptions=True): 126*9c5db199SXin Li """ 127*9c5db199SXin Li Initialize the ControlledStressor. 128*9c5db199SXin Li 129*9c5db199SXin Li @param stressor: callable which performs a single stress event. 130*9c5db199SXin Li @param on_exit: callable which will be called when the thread finishes. 131*9c5db199SXin Li @param escalate_exceptions: whether to escalate exceptions to the parent 132*9c5db199SXin Li thread when stop() is called; defaults to True. 133*9c5db199SXin Li """ 134*9c5db199SXin Li self._complete = threading.Event() 135*9c5db199SXin Li super(ControlledStressor, self).__init__(stressor, on_exit, 136*9c5db199SXin Li escalate_exceptions) 137*9c5db199SXin Li 138*9c5db199SXin Li 139*9c5db199SXin Li def _loop_stressor(self): 140*9c5db199SXin Li """Overloaded from parent.""" 141*9c5db199SXin Li iteration_num = 0 142*9c5db199SXin Li while not self._complete.is_set(): 143*9c5db199SXin Li iteration_num += 1 144*9c5db199SXin Li logging.info('Stressor iteration: %d', iteration_num) 145*9c5db199SXin Li self.stressor() 146*9c5db199SXin Li 147*9c5db199SXin Li 148*9c5db199SXin Li def start(self, start_condition=None, start_timeout_secs=None): 149*9c5db199SXin Li """Start applying the stressor. 150*9c5db199SXin Li 151*9c5db199SXin Li Overloaded from parent. 152*9c5db199SXin Li 153*9c5db199SXin Li @param start_condition: the new thread will wait to until this optional 154*9c5db199SXin Li callable returns True before running the stressor. 155*9c5db199SXin Li @param start_timeout_secs: how long to wait for |start_condition| to 156*9c5db199SXin Li become True, or None to wait forever. 157*9c5db199SXin Li """ 158*9c5db199SXin Li self._complete.clear() 159*9c5db199SXin Li super(ControlledStressor, self).start(start_condition, 160*9c5db199SXin Li start_timeout_secs) 161*9c5db199SXin Li 162*9c5db199SXin Li 163*9c5db199SXin Li def stop(self, timeout=45): 164*9c5db199SXin Li """ 165*9c5db199SXin Li Stop applying the stressor. 166*9c5db199SXin Li 167*9c5db199SXin Li @param timeout: maximum time to wait for a single run of the stressor to 168*9c5db199SXin Li complete, defaults to 45 seconds. 169*9c5db199SXin Li """ 170*9c5db199SXin Li self._complete.set() 171*9c5db199SXin Li self.join(timeout) 172*9c5db199SXin Li self.reraise() 173*9c5db199SXin Li 174*9c5db199SXin Li 175*9c5db199SXin Liclass CountedStressor(BaseStressor): 176*9c5db199SXin Li """ 177*9c5db199SXin Li Run a stressor in a loop on a separate thread a given number of times. 178*9c5db199SXin Li 179*9c5db199SXin Li Creates a new thread and calls |stressor| in a loop |iterations| times. The 180*9c5db199SXin Li calling thread can use wait() to block until the loop completes. If the 181*9c5db199SXin Li stressor thread terminates with an exception, wait() will propagate that 182*9c5db199SXin Li exception to the thread that called wait(). 183*9c5db199SXin Li """ 184*9c5db199SXin Li def _loop_stressor(self): 185*9c5db199SXin Li """Overloaded from parent.""" 186*9c5db199SXin Li for iteration_num in range(1, self._iterations + 1): 187*9c5db199SXin Li logging.info('Stressor iteration: %d of %d', 188*9c5db199SXin Li iteration_num, self._iterations) 189*9c5db199SXin Li self.stressor() 190*9c5db199SXin Li 191*9c5db199SXin Li 192*9c5db199SXin Li def start(self, iterations, start_condition=None, start_timeout_secs=None): 193*9c5db199SXin Li """ 194*9c5db199SXin Li Apply the stressor a given number of times. 195*9c5db199SXin Li 196*9c5db199SXin Li Overloaded from parent. 197*9c5db199SXin Li 198*9c5db199SXin Li @param iterations: number of times to apply the stressor. 199*9c5db199SXin Li @param start_condition: the new thread will wait to until this optional 200*9c5db199SXin Li callable returns True before running the stressor. 201*9c5db199SXin Li @param start_timeout_secs: how long to wait for |start_condition| to 202*9c5db199SXin Li become True, or None to wait forever. 203*9c5db199SXin Li """ 204*9c5db199SXin Li self._iterations = iterations 205*9c5db199SXin Li super(CountedStressor, self).start(start_condition, start_timeout_secs) 206*9c5db199SXin Li 207*9c5db199SXin Li 208*9c5db199SXin Li def wait(self, timeout=None): 209*9c5db199SXin Li """Wait until the stressor completes. 210*9c5db199SXin Li 211*9c5db199SXin Li @param timeout: maximum time for the thread to complete, by default 212*9c5db199SXin Li never times out. 213*9c5db199SXin Li """ 214*9c5db199SXin Li self.join(timeout) 215*9c5db199SXin Li self.reraise() 216