1*9c5db199SXin Li#!/usr/bin/python2 2*9c5db199SXin Li""" 3*9c5db199SXin LiUsage: ./cron_scripts/log_distiller.py job_id path_to_logfile 4*9c5db199SXin Li If the job_id is a suite it will find all subjobs. 5*9c5db199SXin LiYou need to change the location of the log it will parse. 6*9c5db199SXin LiThe job_id needs to be in the afe database. 7*9c5db199SXin Li""" 8*9c5db199SXin Liimport abc 9*9c5db199SXin Liimport datetime 10*9c5db199SXin Liimport os 11*9c5db199SXin Liimport re 12*9c5db199SXin Liimport pprint 13*9c5db199SXin Liimport subprocess 14*9c5db199SXin Liimport sys 15*9c5db199SXin Liimport time 16*9c5db199SXin Li 17*9c5db199SXin Liimport common 18*9c5db199SXin Lifrom autotest_lib.server import frontend 19*9c5db199SXin Li 20*9c5db199SXin Li 21*9c5db199SXin LiLOGFIE = './logs/scheduler.log.2014-04-17-16.51.47' 22*9c5db199SXin Li# logfile name format: scheduler.log.2014-02-14-18.10.56 23*9c5db199SXin Litime_format = '%Y-%m-%d-%H.%M.%S' 24*9c5db199SXin Lilogfile_regex = r'scheduler.log.([0-9,.,-]+)' 25*9c5db199SXin Lilogdir = os.path.join('/usr/local/autotest', 'logs') 26*9c5db199SXin Li 27*9c5db199SXin Liclass StateMachineViolation(Exception): 28*9c5db199SXin Li pass 29*9c5db199SXin Li 30*9c5db199SXin Li 31*9c5db199SXin Liclass LogLineException(Exception): 32*9c5db199SXin Li pass 33*9c5db199SXin Li 34*9c5db199SXin Li 35*9c5db199SXin Lidef should_process_log(time_str, time_format, cutoff_days=7): 36*9c5db199SXin Li """Returns true if the logs was created after cutoff days. 37*9c5db199SXin Li 38*9c5db199SXin Li @param time_str: A string representing the time. 39*9c5db199SXin Li eg: 2014-02-14-18.10.56 40*9c5db199SXin Li @param time_format: A string representing the format of the time string. 41*9c5db199SXin Li ref: http://docs.python.org/2/library/datetime.html#strftime-strptime-behavior 42*9c5db199SXin Li @param cutoff_days: Int representind the cutoff in days. 43*9c5db199SXin Li 44*9c5db199SXin Li @return: Returns True if time_str has aged more than cutoff_days. 45*9c5db199SXin Li """ 46*9c5db199SXin Li log_time = datetime.datetime.strptime(time_str, time_format) 47*9c5db199SXin Li now = datetime.datetime.strptime(time.strftime(time_format), time_format) 48*9c5db199SXin Li cutoff = now - datetime.timedelta(days=cutoff_days) 49*9c5db199SXin Li return log_time < cutoff 50*9c5db199SXin Li 51*9c5db199SXin Li 52*9c5db199SXin Lidef apply_regex(regex, line): 53*9c5db199SXin Li """Simple regex applicator. 54*9c5db199SXin Li 55*9c5db199SXin Li @param regex: Regex to apply. 56*9c5db199SXin Li @param line: The line to apply regex on. 57*9c5db199SXin Li 58*9c5db199SXin Li @return: A tuple with the matching groups, if there was a match. 59*9c5db199SXin Li """ 60*9c5db199SXin Li log_match = re.match(regex, line) 61*9c5db199SXin Li if log_match: 62*9c5db199SXin Li return log_match.groups() 63*9c5db199SXin Li 64*9c5db199SXin Li 65*9c5db199SXin Liclass StateMachineParser(object): 66*9c5db199SXin Li """Abstract class that enforces state transition ordering. 67*9c5db199SXin Li 68*9c5db199SXin Li Classes inheriting from StateMachineParser need to define an 69*9c5db199SXin Li expected_transitions dictionary. The SMP will pop 'to' states 70*9c5db199SXin Li from the dictionary as they occur, so you cannot same state transitions 71*9c5db199SXin Li unless you specify 2 of them. 72*9c5db199SXin Li """ 73*9c5db199SXin Li __metaclass__ = abc.ABCMeta 74*9c5db199SXin Li 75*9c5db199SXin Li 76*9c5db199SXin Li @abc.abstractmethod 77*9c5db199SXin Li def __init__(self): 78*9c5db199SXin Li self.visited_states = [] 79*9c5db199SXin Li self.expected_transitions = {} 80*9c5db199SXin Li 81*9c5db199SXin Li 82*9c5db199SXin Li def advance_state(self, from_state, to_state): 83*9c5db199SXin Li """Checks that a transition is valid. 84*9c5db199SXin Li 85*9c5db199SXin Li @param from_state: A string representind the state the host is leaving. 86*9c5db199SXin Li @param to_state: The state The host is going to, represented as a string. 87*9c5db199SXin Li 88*9c5db199SXin Li @raises LogLineException: If an invalid state transition was 89*9c5db199SXin Li detected. 90*9c5db199SXin Li """ 91*9c5db199SXin Li # TODO: Updating to the same state is a waste of bw. 92*9c5db199SXin Li if from_state and from_state == to_state: 93*9c5db199SXin Li return ('Updating to the same state is a waste of BW: %s->%s' % 94*9c5db199SXin Li (from_state, to_state)) 95*9c5db199SXin Li return 96*9c5db199SXin Li 97*9c5db199SXin Li if (from_state in self.expected_transitions and 98*9c5db199SXin Li to_state in self.expected_transitions[from_state]): 99*9c5db199SXin Li self.expected_transitions[from_state].remove(to_state) 100*9c5db199SXin Li self.visited_states.append(to_state) 101*9c5db199SXin Li else: 102*9c5db199SXin Li return (from_state, to_state) 103*9c5db199SXin Li 104*9c5db199SXin Li 105*9c5db199SXin Liclass SingleJobHostSMP(StateMachineParser): 106*9c5db199SXin Li def __init__(self): 107*9c5db199SXin Li self.visited_states = [] 108*9c5db199SXin Li self.expected_transitions = { 109*9c5db199SXin Li 'Ready': ['Resetting', 'Verifying', 'Pending', 'Provisioning'], 110*9c5db199SXin Li 'Resetting': ['Ready', 'Provisioning'], 111*9c5db199SXin Li 'Pending': ['Running'], 112*9c5db199SXin Li 'Provisioning': ['Repairing'], 113*9c5db199SXin Li 'Running': ['Ready'] 114*9c5db199SXin Li } 115*9c5db199SXin Li 116*9c5db199SXin Li 117*9c5db199SXin Li def check_transitions(self, hostline): 118*9c5db199SXin Li if hostline.line_info['field'] == 'status': 119*9c5db199SXin Li self.advance_state(hostline.line_info['state'], 120*9c5db199SXin Li hostline.line_info['value']) 121*9c5db199SXin Li 122*9c5db199SXin Li 123*9c5db199SXin Liclass SingleJobHqeSMP(StateMachineParser): 124*9c5db199SXin Li def __init__(self): 125*9c5db199SXin Li self.visited_states = [] 126*9c5db199SXin Li self.expected_transitions = { 127*9c5db199SXin Li 'Queued': ['Starting', 'Resetting', 'Aborted'], 128*9c5db199SXin Li 'Resetting': ['Pending', 'Provisioning'], 129*9c5db199SXin Li 'Provisioning': ['Pending', 'Queued', 'Repairing'], 130*9c5db199SXin Li 'Pending': ['Starting'], 131*9c5db199SXin Li 'Starting': ['Running'], 132*9c5db199SXin Li 'Running': ['Gathering', 'Parsing'], 133*9c5db199SXin Li 'Gathering': ['Parsing'], 134*9c5db199SXin Li 'Parsing': ['Completed', 'Aborted'] 135*9c5db199SXin Li } 136*9c5db199SXin Li 137*9c5db199SXin Li 138*9c5db199SXin Li def check_transitions(self, hqeline): 139*9c5db199SXin Li invalid_states = self.advance_state( 140*9c5db199SXin Li hqeline.line_info['from_state'], hqeline.line_info['to_state']) 141*9c5db199SXin Li if not invalid_states: 142*9c5db199SXin Li return 143*9c5db199SXin Li 144*9c5db199SXin Li # Deal with repair. 145*9c5db199SXin Li if (invalid_states[0] == 'Queued' and 146*9c5db199SXin Li 'Running' in self.visited_states): 147*9c5db199SXin Li raise StateMachineViolation('Unrecognized state transition ' 148*9c5db199SXin Li '%s->%s, expected transitions are %s' % 149*9c5db199SXin Li (invalid_states[0], invalid_states[1], 150*9c5db199SXin Li self.expected_transitions)) 151*9c5db199SXin Li 152*9c5db199SXin Li 153*9c5db199SXin Liclass LogLine(object): 154*9c5db199SXin Li """Line objects. 155*9c5db199SXin Li 156*9c5db199SXin Li All classes inheriting from LogLine represent a line of some sort. 157*9c5db199SXin Li A line is responsible for parsing itself, and invoking an SMP to 158*9c5db199SXin Li validate state transitions. A line can be part of several state machines. 159*9c5db199SXin Li """ 160*9c5db199SXin Li line_format = '%s' 161*9c5db199SXin Li 162*9c5db199SXin Li 163*9c5db199SXin Li def __init__(self, state_machine_parsers): 164*9c5db199SXin Li """ 165*9c5db199SXin Li @param state_machine_parsers: A list of smp objects to use to validate 166*9c5db199SXin Li state changes on these types of lines.. 167*9c5db199SXin Li """ 168*9c5db199SXin Li self.smps = state_machine_parsers 169*9c5db199SXin Li 170*9c5db199SXin Li # Because, this is easier to flush. 171*9c5db199SXin Li self.line_info = {} 172*9c5db199SXin Li 173*9c5db199SXin Li 174*9c5db199SXin Li def parse_line(self, line): 175*9c5db199SXin Li """Apply a line regex and save any information the parsed line contains. 176*9c5db199SXin Li 177*9c5db199SXin Li @param line: A string representing a line. 178*9c5db199SXin Li """ 179*9c5db199SXin Li # Regex for all the things. 180*9c5db199SXin Li line_rgx = '(.*)' 181*9c5db199SXin Li parsed_line = apply_regex(line_rgx, line) 182*9c5db199SXin Li if parsed_line: 183*9c5db199SXin Li self.line_info['line'] = parsed_line[0] 184*9c5db199SXin Li 185*9c5db199SXin Li 186*9c5db199SXin Li def flush(self): 187*9c5db199SXin Li """Call any state machine parsers, persist line info if needed. 188*9c5db199SXin Li """ 189*9c5db199SXin Li for smp in self.smps: 190*9c5db199SXin Li smp.check_transitions(self) 191*9c5db199SXin Li # TODO: persist this? 192*9c5db199SXin Li self.line_info={} 193*9c5db199SXin Li 194*9c5db199SXin Li 195*9c5db199SXin Li def format_line(self): 196*9c5db199SXin Li try: 197*9c5db199SXin Li return self.line_format % self.line_info 198*9c5db199SXin Li except KeyError: 199*9c5db199SXin Li return self.line_info['line'] 200*9c5db199SXin Li 201*9c5db199SXin Li 202*9c5db199SXin Liclass TimeLine(LogLine): 203*9c5db199SXin Li """Filters timestamps for scheduler logs. 204*9c5db199SXin Li """ 205*9c5db199SXin Li 206*9c5db199SXin Li def parse_line(self, line): 207*9c5db199SXin Li super(TimeLine, self).parse_line(line) 208*9c5db199SXin Li 209*9c5db199SXin Li # Regex for isolating the date and time from scheduler logs, eg: 210*9c5db199SXin Li # 02/16 16:04:36.573 INFO |scheduler_:0574|... 211*9c5db199SXin Li line_rgx = '([0-9,/,:,., ]+)(.*)' 212*9c5db199SXin Li parsed_line = apply_regex(line_rgx, self.line_info['line']) 213*9c5db199SXin Li if parsed_line: 214*9c5db199SXin Li self.line_info['time'] = parsed_line[0] 215*9c5db199SXin Li self.line_info['line'] = parsed_line[1] 216*9c5db199SXin Li 217*9c5db199SXin Li 218*9c5db199SXin Liclass HostLine(TimeLine): 219*9c5db199SXin Li """Manages hosts line parsing. 220*9c5db199SXin Li """ 221*9c5db199SXin Li line_format = (' \t\t %(time)s %(host)s, currently in %(state)s, ' 222*9c5db199SXin Li 'updated %(field)s->%(value)s') 223*9c5db199SXin Li 224*9c5db199SXin Li 225*9c5db199SXin Li def record_state_transition(self, line): 226*9c5db199SXin Li """Apply the state_transition_rgx to a line and record state changes. 227*9c5db199SXin Li 228*9c5db199SXin Li @param line: The line we're expecting to contain a state transition. 229*9c5db199SXin Li """ 230*9c5db199SXin Li state_transition_rgx = ".* ([a-zA-Z]+) updating {'([a-zA-Z]+)': ('[a-zA-Z]+'|[0-9])}.*" 231*9c5db199SXin Li match = apply_regex(state_transition_rgx, line) 232*9c5db199SXin Li if match: 233*9c5db199SXin Li self.line_info['state'] = match[0] 234*9c5db199SXin Li self.line_info['field'] = match[1] 235*9c5db199SXin Li self.line_info['value'] = match[2].replace("'", "") 236*9c5db199SXin Li 237*9c5db199SXin Li 238*9c5db199SXin Li def parse_line(self, line): 239*9c5db199SXin Li super(HostLine, self).parse_line(line) 240*9c5db199SXin Li 241*9c5db199SXin Li # Regex for getting host status. Eg: 242*9c5db199SXin Li # 172.22.4 in Running updating {'status': 'Running'} 243*9c5db199SXin Li line_rgx = '.*Host (([0-9,.,a-z,-]+).*)' 244*9c5db199SXin Li parsed_line = apply_regex(line_rgx, self.line_info['line']) 245*9c5db199SXin Li if parsed_line: 246*9c5db199SXin Li self.line_info['line'] = parsed_line[0] 247*9c5db199SXin Li self.line_info['host'] = parsed_line[1] 248*9c5db199SXin Li self.record_state_transition(self.line_info['line']) 249*9c5db199SXin Li return self.format_line() 250*9c5db199SXin Li 251*9c5db199SXin Li 252*9c5db199SXin Liclass HQELine(TimeLine): 253*9c5db199SXin Li """Manages HQE line parsing. 254*9c5db199SXin Li """ 255*9c5db199SXin Li line_format = ('%(time)s %(hqe)s, currently in %(from_state)s, ' 256*9c5db199SXin Li 'updated to %(to_state)s. Flags: %(flags)s') 257*9c5db199SXin Li 258*9c5db199SXin Li 259*9c5db199SXin Li def record_state_transition(self, line): 260*9c5db199SXin Li """Apply the state_transition_rgx to a line and record state changes. 261*9c5db199SXin Li 262*9c5db199SXin Li @param line: The line we're expecting to contain a state transition. 263*9c5db199SXin Li """ 264*9c5db199SXin Li # Regex for getting hqe status. Eg: 265*9c5db199SXin Li # status:Running [active] -> Gathering 266*9c5db199SXin Li state_transition_rgx = ".*status:([a-zA-Z]+)( \[[a-z\,]+\])? -> ([a-zA-Z]+)" 267*9c5db199SXin Li match = apply_regex(state_transition_rgx, line) 268*9c5db199SXin Li if match: 269*9c5db199SXin Li self.line_info['from_state'] = match[0] 270*9c5db199SXin Li self.line_info['flags'] = match[1] 271*9c5db199SXin Li self.line_info['to_state'] = match[2] 272*9c5db199SXin Li 273*9c5db199SXin Li 274*9c5db199SXin Li def parse_line(self, line): 275*9c5db199SXin Li super(HQELine, self).parse_line(line) 276*9c5db199SXin Li line_rgx = r'.*\| HQE: (([0-9]+).*)' 277*9c5db199SXin Li parsed_line = apply_regex(line_rgx, self.line_info['line']) 278*9c5db199SXin Li if parsed_line: 279*9c5db199SXin Li self.line_info['line'] = parsed_line[0] 280*9c5db199SXin Li self.line_info['hqe'] = parsed_line[1] 281*9c5db199SXin Li self.record_state_transition(self.line_info['line']) 282*9c5db199SXin Li return self.format_line() 283*9c5db199SXin Li 284*9c5db199SXin Li 285*9c5db199SXin Liclass LogCrawler(object): 286*9c5db199SXin Li """Crawl logs. 287*9c5db199SXin Li 288*9c5db199SXin Li Log crawlers are meant to apply some basic preprocessing to a log, and crawl 289*9c5db199SXin Li the output validating state changes. They manage line and state machine 290*9c5db199SXin Li creation. The initial filtering applied to the log needs to be grab all lines 291*9c5db199SXin Li that match an action, such as the running of a job. 292*9c5db199SXin Li """ 293*9c5db199SXin Li 294*9c5db199SXin Li def __init__(self, log_name): 295*9c5db199SXin Li self.log = log_name 296*9c5db199SXin Li self.filter_command = 'cat %s' % log_name 297*9c5db199SXin Li 298*9c5db199SXin Li 299*9c5db199SXin Li def preprocess_log(self): 300*9c5db199SXin Li """Apply some basic filtering to the log. 301*9c5db199SXin Li """ 302*9c5db199SXin Li proc = subprocess.Popen(self.filter_command, 303*9c5db199SXin Li shell=True, stdout=subprocess.PIPE) 304*9c5db199SXin Li out, err = proc.communicate() 305*9c5db199SXin Li return out 306*9c5db199SXin Li 307*9c5db199SXin Li 308*9c5db199SXin Liclass SchedulerLogCrawler(LogCrawler): 309*9c5db199SXin Li """A log crawler for the scheduler logs. 310*9c5db199SXin Li 311*9c5db199SXin Li This crawler is only capable of processing information about a single job. 312*9c5db199SXin Li """ 313*9c5db199SXin Li 314*9c5db199SXin Li def __init__(self, log_name, **kwargs): 315*9c5db199SXin Li super(SchedulerLogCrawler, self).__init__(log_name) 316*9c5db199SXin Li self.job_id = kwargs['job_id'] 317*9c5db199SXin Li self.line_processors = [HostLine([SingleJobHostSMP()]), 318*9c5db199SXin Li HQELine([SingleJobHqeSMP()])] 319*9c5db199SXin Li self.filter_command = ('%s | grep "for job: %s"' % 320*9c5db199SXin Li (self.filter_command, self.job_id)) 321*9c5db199SXin Li 322*9c5db199SXin Li 323*9c5db199SXin Li def parse_log(self): 324*9c5db199SXin Li """Parse each line of the preprocessed log output. 325*9c5db199SXin Li 326*9c5db199SXin Li Pass each line through each possible line_processor. The one that matches 327*9c5db199SXin Li will populate itself, call flush, this will walk the state machine of that 328*9c5db199SXin Li line to the next step. 329*9c5db199SXin Li """ 330*9c5db199SXin Li out = self.preprocess_log() 331*9c5db199SXin Li response = [] 332*9c5db199SXin Li for job_line in out.split('\n'): 333*9c5db199SXin Li parsed_line = None 334*9c5db199SXin Li for processor in self.line_processors: 335*9c5db199SXin Li line = processor.parse_line(job_line) 336*9c5db199SXin Li if line and parsed_line: 337*9c5db199SXin Li raise LogLineException('Multiple Parsers claiming the line %s: ' 338*9c5db199SXin Li 'previous parsing: %s, current parsing: %s ' % 339*9c5db199SXin Li (job_line, parsed_line, line)) 340*9c5db199SXin Li elif line: 341*9c5db199SXin Li parsed_line = line 342*9c5db199SXin Li try: 343*9c5db199SXin Li processor.flush() 344*9c5db199SXin Li except StateMachineViolation as e: 345*9c5db199SXin Li response.append(str(e)) 346*9c5db199SXin Li raise StateMachineViolation(response) 347*9c5db199SXin Li response.append(parsed_line if parsed_line else job_line) 348*9c5db199SXin Li return response 349*9c5db199SXin Li 350*9c5db199SXin Li 351*9c5db199SXin Lidef process_logs(): 352*9c5db199SXin Li if len(sys.argv) < 2: 353*9c5db199SXin Li print ('Usage: ./cron_scripts/log_distiller.py 0 8415620 ' 354*9c5db199SXin Li 'You need to change the location of the log it will parse.' 355*9c5db199SXin Li 'The job_id needs to be in the afe database.') 356*9c5db199SXin Li sys.exit(1) 357*9c5db199SXin Li 358*9c5db199SXin Li job_id = int(sys.argv[1]) 359*9c5db199SXin Li rpc = frontend.AFE() 360*9c5db199SXin Li suite_jobs = rpc.run('get_jobs', id=job_id) 361*9c5db199SXin Li if not suite_jobs[0]['parent_job']: 362*9c5db199SXin Li suite_jobs = rpc.run('get_jobs', parent_job=job_id) 363*9c5db199SXin Li try: 364*9c5db199SXin Li logfile = sys.argv[2] 365*9c5db199SXin Li except Exception: 366*9c5db199SXin Li logfile = LOGFILE 367*9c5db199SXin Li 368*9c5db199SXin Li for job in suite_jobs: 369*9c5db199SXin Li log_crawler = SchedulerLogCrawler(logfile, job_id=job['id']) 370*9c5db199SXin Li for line in log_crawler.parse_log(): 371*9c5db199SXin Li print line 372*9c5db199SXin Li return 373*9c5db199SXin Li 374*9c5db199SXin Li 375*9c5db199SXin Liif __name__ == '__main__': 376*9c5db199SXin Li process_logs() 377