1#!/usr/bin/python 2"""Summarize the results of many RAPPOR analysis runs. 3 4Takes a list of STATUS.txt files on stdin, and reads the corresponding spec.txt 5and log.txt files. Writes a CSV to stdout. Row key is (metric, date). 6""" 7 8import collections 9import csv 10import json 11import os 12import re 13import sys 14 15 16# Parse bash 'time' output: 17# real 0m11.578s 18 19# TODO: Parse the time from metrics.json instead. 20TIMING_RE = re.compile( 21 r'real \s+ (\d+) m ([\d.]+) s', re.VERBOSE) 22 23# TODO: Could have decode-dist and decode-assoc output the PID? 24PID_RE = re.compile( 25 r'write_pid.py: PID (\d+)') # not VERBOSE, spaces are literal 26 27 28def ParseMemCsv(f): 29 """Compute summary stats for memory. 30 31 vm5_peak_kib -> max(vm_peak_kib) # over 5 second intervals. Since it uses 32 the kernel, it's accurate except for takes that spike in their last 4 33 seconds. 34 35 vm5_mean_kib -> mean(vm_size_kib) # over 5 second intervals 36 """ 37 peak_by_pid = collections.defaultdict(list) 38 size_by_pid = collections.defaultdict(list) 39 40 # Parse columns we care about, by PID 41 c = csv.reader(f) 42 for i, row in enumerate(c): 43 if i == 0: 44 continue # skip header 45 # looks like timestamp, pid, then (rss, peak, size) 46 _, pid, _, peak, size = row 47 if peak != '': 48 peak_by_pid[pid].append(int(peak)) 49 if size != '': 50 size_by_pid[pid].append(int(size)) 51 52 mem_by_pid = {} 53 54 # Now compute summaries 55 pids = peak_by_pid.keys() 56 for pid in pids: 57 peaks = peak_by_pid[pid] 58 vm5_peak_kib = max(peaks) 59 60 sizes = size_by_pid[pid] 61 vm5_mean_kib = sum(sizes) / len(sizes) 62 63 mem_by_pid[pid] = (vm5_peak_kib, vm5_mean_kib) 64 65 return mem_by_pid 66 67 68def CheckJobId(job_id, parts): 69 """Sanity check for date or smoke test.""" 70 if not job_id.startswith('201') and not job_id.startswith('smoke'): 71 raise RuntimeError( 72 "Expected job ID to start with '201' or 'smoke': got %r (%s)" % 73 (job_id, parts)) 74 75 76def ReadStatus(f): 77 status_line = f.readline().strip() 78 return status_line.split()[0] # OK, TIMEOUT, FAIL 79 80 81def CombineDistTaskStatus(stdin, c_out, mem_by_pid): 82 """Read status task paths from stdin, write CSV summary to c_out'.""" 83 84 #util.log('%s', mem_by_pid) 85 86 # Parses: 87 # - input path for metric name and date 88 # - spec.txt for task params 89 # - STATUS.txt for task success/failure 90 # - metrics.json for output metrics 91 # - log.txt for timing, if it ran to completion 92 # - and for structured data 93 # - join with mem by PID 94 95 header = ( 96 'job_id', 'params_file', 'map_file', 97 'metric', 'date', 98 'vm5_peak_kib', 'vm5_mean_kib', # set when not skipped 99 'seconds', 'status', 100 # only set when OK 101 'num_reports', 'num_rappor', 'allocated_mass', 102 # only set when failed 103 'fail_reason') 104 c_out.writerow(header) 105 106 for line in stdin: 107 # 108 # Receive a STATUS.txt path on each line of stdin, and parse it. 109 # 110 status_path = line.strip() 111 112 with open(status_path) as f: 113 status = ReadStatus(f) 114 115 # Path should look like this: 116 # ~/rappor/cron/2015-05-20__19-22-01/raw/Settings.NewTabPage/2015-05-19/STATUS.txt 117 parts = status_path.split('/') 118 job_id = parts[-5] 119 CheckJobId(job_id, parts) 120 121 # 122 # Parse the job spec 123 # 124 result_dir = os.path.dirname(status_path) 125 spec_file = os.path.join(result_dir, 'spec.txt') 126 with open(spec_file) as f: 127 spec_line = f.readline() 128 # See backfill.sh analyze-one for the order of these 7 fields. 129 # There are 3 job constants on the front. 130 (num_reports, metric_name, date, counts_path, params_path, 131 map_path, _) = spec_line.split() 132 133 # NOTE: These are all constant per metric. Could have another CSV and 134 # join. But denormalizing is OK for now. 135 params_file = os.path.basename(params_path) 136 map_file = os.path.basename(map_path) 137 138 # remove extension 139 params_file, _ = os.path.splitext(params_file) 140 map_file, _ = os.path.splitext(map_file) 141 142 # 143 # Read the log 144 # 145 log_file = os.path.join(result_dir, 'log.txt') 146 with open(log_file) as f: 147 lines = f.readlines() 148 149 # Search lines in reverse order for total time. It could have output from 150 # multiple 'time' statements, and we want the last one. 151 seconds = None # for skipped 152 for i in xrange(len(lines) - 1, -1, -1): 153 # TODO: Parse the R timing too. Could use LOG_RECORD_RE. 154 m = TIMING_RE.search(lines[i]) 155 if m: 156 min_part, sec_part = m.groups() 157 seconds = float(min_part) * 60 + float(sec_part) 158 break 159 160 # Extract stack trace 161 if status == 'FAIL': 162 # Stack trace looks like: "Calls: main -> RunOne ..." 163 fail_reason = ''.join(line.strip() for line in lines if 'Calls' in line) 164 else: 165 fail_reason = None 166 167 # Extract PID and join with memory results 168 pid = None 169 vm5_peak_kib = None 170 vm5_mean_kib = None 171 if mem_by_pid: 172 for line in lines: 173 m = PID_RE.match(line) 174 if m: 175 pid = m.group(1) 176 # Could the PID not exist if the process was super short was less 177 # than 5 seconds? 178 try: 179 vm5_peak_kib, vm5_mean_kib = mem_by_pid[pid] 180 except KeyError: # sometimes we don't add mem-track on the front 181 vm5_peak_kib, vm5_mean_kib = None, None 182 break 183 else: 184 pass # we weren't passed memory.csv 185 186 # 187 # Read the metrics 188 # 189 metrics = {} 190 metrics_file = os.path.join(result_dir, 'metrics.json') 191 if os.path.isfile(metrics_file): 192 with open(metrics_file) as f: 193 metrics = json.load(f) 194 195 num_rappor = metrics.get('num_detected') 196 allocated_mass = metrics.get('allocated_mass') 197 198 # Construct and write row 199 row = ( 200 job_id, params_file, map_file, 201 metric_name, date, 202 vm5_peak_kib, vm5_mean_kib, 203 seconds, status, 204 num_reports, num_rappor, allocated_mass, 205 fail_reason) 206 207 c_out.writerow(row) 208 209 210def CombineAssocTaskStatus(stdin, c_out): 211 """Read status task paths from stdin, write CSV summary to c_out'.""" 212 213 header = ( 214 'job_id', 'metric', 'date', 'status', 'num_reports', 215 'total_elapsed_seconds', 'em_elapsed_seconds', 'var1', 'var2', 'd1', 216 'd2') 217 218 c_out.writerow(header) 219 220 for line in stdin: 221 status_path = line.strip() 222 223 with open(status_path) as f: 224 status = ReadStatus(f) 225 226 parts = status_path.split('/') 227 job_id = parts[-6] 228 CheckJobId(job_id, parts) 229 230 # 231 # Parse the job spec 232 # 233 result_dir = os.path.dirname(status_path) 234 spec_file = os.path.join(result_dir, 'assoc-spec.txt') 235 with open(spec_file) as f: 236 spec_line = f.readline() 237 # See backfill.sh analyze-one for the order of these 7 fields. 238 # There are 3 job constants on the front. 239 240 # 5 job params 241 (_, _, _, _, _, 242 dummy_num_reports, metric_name, date, reports, var1, var2, map1, 243 output_dir) = spec_line.split() 244 245 # 246 # Parse decode-assoc metrics 247 # 248 metrics = {} 249 metrics_file = os.path.join(result_dir, 'assoc-metrics.json') 250 if os.path.isfile(metrics_file): 251 with open(metrics_file) as f: 252 metrics = json.load(f) 253 254 # After we run it we have the actual number of reports 255 num_reports = metrics.get('num_reports') 256 total_elapsed_seconds = metrics.get('total_elapsed_time') 257 em_elapsed_seconds = metrics.get('em_elapsed_time') 258 estimate_dimensions = metrics.get('estimate_dimensions') 259 if estimate_dimensions: 260 d1, d2 = estimate_dimensions 261 else: 262 d1, d2 = (0, 0) # unknown 263 264 row = ( 265 job_id, metric_name, date, status, num_reports, total_elapsed_seconds, 266 em_elapsed_seconds, var1, var2, d1, d2) 267 c_out.writerow(row) 268 269 270def main(argv): 271 action = argv[1] 272 273 try: 274 mem_csv = argv[2] 275 except IndexError: 276 mem_by_pid = None 277 else: 278 with open(mem_csv) as f: 279 mem_by_pid = ParseMemCsv(f) 280 281 if action == 'dist': 282 c_out = csv.writer(sys.stdout) 283 CombineDistTaskStatus(sys.stdin, c_out, mem_by_pid) 284 285 elif action == 'assoc': 286 c_out = csv.writer(sys.stdout) 287 CombineAssocTaskStatus(sys.stdin, c_out) 288 289 else: 290 raise RuntimeError('Invalid action %r' % action) 291 292 293if __name__ == '__main__': 294 try: 295 main(sys.argv) 296 except RuntimeError, e: 297 print >>sys.stderr, 'FATAL: %s' % e 298 sys.exit(1) 299