xref: /aosp_15_r20/external/rappor/pipeline/combine_status.py (revision 2abb31345f6c95944768b5222a9a5ed3fc68cc00)
1#!/usr/bin/python
2"""Summarize the results of many RAPPOR analysis runs.
3
4Takes a list of STATUS.txt files on stdin, and reads the corresponding spec.txt
5and log.txt files.  Writes a CSV to stdout.  Row key is (metric, date).
6"""
7
8import collections
9import csv
10import json
11import os
12import re
13import sys
14
15
16# Parse bash 'time' output:
17# real    0m11.578s
18
19# TODO: Parse the time from metrics.json instead.
20TIMING_RE = re.compile(
21    r'real \s+ (\d+) m ([\d.]+) s', re.VERBOSE)
22
23# TODO: Could have decode-dist and decode-assoc output the PID?
24PID_RE = re.compile(
25    r'write_pid.py: PID (\d+)')  # not VERBOSE, spaces are literal
26
27
28def ParseMemCsv(f):
29  """Compute summary stats for memory.
30
31  vm5_peak_kib -> max(vm_peak_kib)  # over 5 second intervals.  Since it uses
32  the kernel, it's accurate except for takes that spike in their last 4
33  seconds.
34
35  vm5_mean_kib -> mean(vm_size_kib)  # over 5 second intervals
36  """
37  peak_by_pid = collections.defaultdict(list)
38  size_by_pid = collections.defaultdict(list)
39
40  # Parse columns we care about, by PID
41  c = csv.reader(f)
42  for i, row in enumerate(c):
43    if i == 0:
44      continue  # skip header
45    # looks like timestamp, pid, then (rss, peak, size)
46    _, pid, _, peak, size = row
47    if peak != '':
48      peak_by_pid[pid].append(int(peak))
49    if size != '':
50      size_by_pid[pid].append(int(size))
51
52  mem_by_pid = {}
53
54  # Now compute summaries
55  pids = peak_by_pid.keys()
56  for pid in pids:
57    peaks = peak_by_pid[pid]
58    vm5_peak_kib = max(peaks)
59
60    sizes = size_by_pid[pid]
61    vm5_mean_kib = sum(sizes) / len(sizes)
62
63    mem_by_pid[pid] = (vm5_peak_kib, vm5_mean_kib)
64
65  return mem_by_pid
66
67
68def CheckJobId(job_id, parts):
69  """Sanity check for date or smoke test."""
70  if not job_id.startswith('201') and not job_id.startswith('smoke'):
71    raise RuntimeError(
72        "Expected job ID to start with '201' or 'smoke': got %r (%s)" %
73        (job_id, parts))
74
75
76def ReadStatus(f):
77  status_line = f.readline().strip()
78  return status_line.split()[0]  # OK, TIMEOUT, FAIL
79
80
81def CombineDistTaskStatus(stdin, c_out, mem_by_pid):
82  """Read status task paths from stdin, write CSV summary to c_out'."""
83
84  #util.log('%s', mem_by_pid)
85
86  # Parses:
87  # - input path for metric name and date
88  # - spec.txt for task params
89  # - STATUS.txt for task success/failure
90  # - metrics.json for output metrics
91  # - log.txt for timing, if it ran to completion
92  #   - and for structured data
93  # - join with mem by PID
94
95  header = (
96      'job_id', 'params_file', 'map_file',
97      'metric', 'date',
98      'vm5_peak_kib', 'vm5_mean_kib',  # set when not skipped
99      'seconds', 'status',
100      # only set when OK
101      'num_reports', 'num_rappor', 'allocated_mass',
102      # only set when failed
103      'fail_reason')
104  c_out.writerow(header)
105
106  for line in stdin:
107    #
108    # Receive a STATUS.txt path on each line of stdin, and parse it.
109    #
110    status_path = line.strip()
111
112    with open(status_path) as f:
113      status = ReadStatus(f)
114
115    # Path should look like this:
116    # ~/rappor/cron/2015-05-20__19-22-01/raw/Settings.NewTabPage/2015-05-19/STATUS.txt
117    parts = status_path.split('/')
118    job_id = parts[-5]
119    CheckJobId(job_id, parts)
120
121    #
122    # Parse the job spec
123    #
124    result_dir = os.path.dirname(status_path)
125    spec_file = os.path.join(result_dir, 'spec.txt')
126    with open(spec_file) as f:
127      spec_line = f.readline()
128      # See backfill.sh analyze-one for the order of these 7 fields.
129      # There are 3 job constants on the front.
130      (num_reports, metric_name, date, counts_path, params_path,
131       map_path, _) = spec_line.split()
132
133    # NOTE: These are all constant per metric.  Could have another CSV and
134    # join.  But denormalizing is OK for now.
135    params_file = os.path.basename(params_path)
136    map_file = os.path.basename(map_path)
137
138    # remove extension
139    params_file, _ = os.path.splitext(params_file)
140    map_file, _ = os.path.splitext(map_file)
141
142    #
143    # Read the log
144    #
145    log_file = os.path.join(result_dir, 'log.txt')
146    with open(log_file) as f:
147      lines = f.readlines()
148
149    # Search lines in reverse order for total time.  It could have output from
150    # multiple 'time' statements, and we want the last one.
151    seconds = None  # for skipped
152    for i in xrange(len(lines) - 1, -1, -1):
153      # TODO: Parse the R timing too.  Could use LOG_RECORD_RE.
154      m = TIMING_RE.search(lines[i])
155      if m:
156        min_part, sec_part = m.groups()
157        seconds = float(min_part) * 60 + float(sec_part)
158        break
159
160    # Extract stack trace
161    if status == 'FAIL':
162      # Stack trace looks like: "Calls: main -> RunOne ..."
163      fail_reason = ''.join(line.strip() for line in lines if 'Calls' in line)
164    else:
165      fail_reason = None
166
167    # Extract PID and join with memory results
168    pid = None
169    vm5_peak_kib = None
170    vm5_mean_kib = None
171    if mem_by_pid:
172      for line in lines:
173        m = PID_RE.match(line)
174        if m:
175          pid = m.group(1)
176          # Could the PID not exist if the process was super short was less
177          # than 5 seconds?
178          try:
179            vm5_peak_kib, vm5_mean_kib = mem_by_pid[pid]
180          except KeyError:  # sometimes we don't add mem-track on the front
181            vm5_peak_kib, vm5_mean_kib = None, None
182          break
183    else:
184      pass  # we weren't passed memory.csv
185
186    #
187    # Read the metrics
188    #
189    metrics = {}
190    metrics_file = os.path.join(result_dir, 'metrics.json')
191    if os.path.isfile(metrics_file):
192      with open(metrics_file) as f:
193        metrics = json.load(f)
194
195    num_rappor = metrics.get('num_detected')
196    allocated_mass = metrics.get('allocated_mass')
197
198    # Construct and write row
199    row = (
200        job_id, params_file, map_file,
201        metric_name, date,
202        vm5_peak_kib, vm5_mean_kib,
203        seconds, status,
204        num_reports, num_rappor, allocated_mass,
205        fail_reason)
206
207    c_out.writerow(row)
208
209
210def CombineAssocTaskStatus(stdin, c_out):
211  """Read status task paths from stdin, write CSV summary to c_out'."""
212
213  header = (
214      'job_id', 'metric', 'date', 'status', 'num_reports',
215      'total_elapsed_seconds', 'em_elapsed_seconds', 'var1', 'var2', 'd1',
216      'd2')
217
218  c_out.writerow(header)
219
220  for line in stdin:
221    status_path = line.strip()
222
223    with open(status_path) as f:
224      status = ReadStatus(f)
225
226    parts = status_path.split('/')
227    job_id = parts[-6]
228    CheckJobId(job_id, parts)
229
230    #
231    # Parse the job spec
232    #
233    result_dir = os.path.dirname(status_path)
234    spec_file = os.path.join(result_dir, 'assoc-spec.txt')
235    with open(spec_file) as f:
236      spec_line = f.readline()
237      # See backfill.sh analyze-one for the order of these 7 fields.
238      # There are 3 job constants on the front.
239
240      # 5 job params
241      (_, _, _, _, _,
242       dummy_num_reports, metric_name, date, reports, var1, var2, map1,
243       output_dir) = spec_line.split()
244
245    #
246    # Parse decode-assoc metrics
247    #
248    metrics = {}
249    metrics_file = os.path.join(result_dir, 'assoc-metrics.json')
250    if os.path.isfile(metrics_file):
251      with open(metrics_file) as f:
252        metrics = json.load(f)
253
254    # After we run it we have the actual number of reports
255    num_reports = metrics.get('num_reports')
256    total_elapsed_seconds = metrics.get('total_elapsed_time')
257    em_elapsed_seconds = metrics.get('em_elapsed_time')
258    estimate_dimensions = metrics.get('estimate_dimensions')
259    if estimate_dimensions:
260      d1, d2 = estimate_dimensions
261    else:
262      d1, d2 = (0, 0)  # unknown
263
264    row = (
265        job_id, metric_name, date, status, num_reports, total_elapsed_seconds,
266        em_elapsed_seconds, var1, var2, d1, d2)
267    c_out.writerow(row)
268
269
270def main(argv):
271  action = argv[1]
272
273  try:
274    mem_csv = argv[2]
275  except IndexError:
276    mem_by_pid = None
277  else:
278    with open(mem_csv) as f:
279      mem_by_pid = ParseMemCsv(f)
280
281  if action == 'dist':
282    c_out = csv.writer(sys.stdout)
283    CombineDistTaskStatus(sys.stdin, c_out, mem_by_pid)
284
285  elif action == 'assoc':
286    c_out = csv.writer(sys.stdout)
287    CombineAssocTaskStatus(sys.stdin, c_out)
288
289  else:
290    raise RuntimeError('Invalid action %r' % action)
291
292
293if __name__ == '__main__':
294  try:
295    main(sys.argv)
296  except RuntimeError, e:
297    print >>sys.stderr, 'FATAL: %s' % e
298    sys.exit(1)
299