xref: /aosp_15_r20/external/rappor/pipeline/task_spec.py (revision 2abb31345f6c95944768b5222a9a5ed3fc68cc00)
1#!/usr/bin/python
2"""Read a list of 'counts' paths on stdin, and write a task spec on stdout.
3
4Each line represents a task, or R process invocation.  The params on each line
5are passed to ./dist.sh decode-many or ./assoc.sh decode-many.
6"""
7
8import collections
9import csv
10import errno
11import optparse
12import os
13import pprint
14import re
15import sys
16
17import util
18
19
20def _ReadDistMaps(f):
21  dist_maps = {}
22  c = csv.reader(f)
23  for i, row in enumerate(c):
24    if i == 0:
25      expected = ['var', 'map_filename']
26      if row != expected:
27        raise RuntimeError('Expected CSV header %s' % expected)
28      continue  # skip header
29
30    var_name, map_filename = row
31    dist_maps[var_name] = map_filename
32  return dist_maps
33
34
35class DistMapLookup(object):
36  """Create a dictionary of var -> map to analyze against.
37
38  TODO: Support a LIST of maps.  Users should be able to specify more than one.
39  """
40  def __init__(self, f, map_dir):
41    self.dist_maps = _ReadDistMaps(f)
42    self.map_dir = map_dir
43
44  def GetMapPath(self, var_name):
45    filename = self.dist_maps[var_name]
46    return os.path.join(self.map_dir, filename)
47
48
49def CreateFieldIdLookup(f):
50  """Create a dictionary that specifies single variable analysis each var.
51
52  Args:
53    config_dir: directory of metadata, output by update_rappor.par
54
55  Returns:
56    A dictionary from field ID -> full field name
57
58  NOTE: Right now we're only doing single variable analysis for strings, so we
59  don't have the "type".
60  """
61  field_id_lookup = {}
62  c = csv.reader(f)
63  for i, row in enumerate(c):
64    if i == 0:
65      expected = ['metric', 'field', 'field_type', 'params', 'field_id']
66      if row != expected:
67        raise RuntimeError('Expected CSV header %s' % expected)
68      continue
69
70    metric, field, field_type, _, field_id = row
71
72    if field_type != 'string':
73      continue
74
75    # Paper over the difference between plain metrics (single variable) and
76    # metrics with fields (multiple variables, for association analysis).
77    if field:
78      full_field_name = '%s.%s' % (metric, field)
79    else:
80      full_field_name = metric
81
82    field_id_lookup[field_id] = full_field_name
83  return field_id_lookup
84
85
86def _ReadVarSchema(f):
87  """Given the rappor-vars.csv file, return a list of metric/var/type."""
88  # metric -> list of (variable name, type)
89  assoc_metrics = collections.defaultdict(list)
90  params_lookup = {}
91
92  c = csv.reader(f)
93  for i, row in enumerate(c):
94    if i == 0:
95      expected = ['metric', 'var', 'var_type', 'params']
96      if row != expected:
97        raise RuntimeError('Expected CSV header %s, got %s' % (expected, row))
98      continue
99
100    metric, var, var_type, params = row
101    if var == '':
102      full_var_name = metric
103    else:
104      full_var_name = '%s.%s' % (metric, var)
105      # Also group multi-dimensional reports
106      assoc_metrics[metric].append((var, var_type))
107
108    params_lookup[full_var_name] = params
109
110  return assoc_metrics, params_lookup
111
112
113class VarSchema(object):
114  """Object representing rappor-vars.csv.
115
116  Right now we use it for slightly different purposes for dist and assoc
117  analysis.
118  """
119  def __init__(self, f, params_dir):
120    self.assoc_metrics, self.params_lookup = _ReadVarSchema(f)
121    self.params_dir = params_dir
122
123  def GetParamsPath(self, var_name):
124    filename = self.params_lookup[var_name]
125    return os.path.join(self.params_dir, filename + '.csv')
126
127  def GetAssocMetrics(self):
128    return self.assoc_metrics
129
130
131def CountReports(f):
132  num_reports = 0
133  for line in f:
134    first_col = line.split(',')[0]
135    num_reports += int(first_col)
136  return num_reports
137
138
139DIST_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_counts.csv')
140
141
142def DistInputIter(stdin):
143  """Read lines from stdin and extract fields to construct analysis tasks."""
144  for line in stdin:
145    m = DIST_INPUT_PATH_RE.match(line)
146    if not m:
147      raise RuntimeError('Invalid path %r' % line)
148
149    counts_path = line.strip()
150    date, field_id = m.groups()
151
152    yield counts_path, date, field_id
153
154
155def DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, bad_c):
156  """Print task spec for single variable RAPPOR to stdout."""
157
158  num_bad = 0
159  unique_ids = set()
160
161  for counts_path, date, field_id in input_iter:
162    unique_ids.add(field_id)
163
164    # num_reports is used for filtering
165    with open(counts_path) as f:
166      num_reports = CountReports(f)
167
168    # Look up field name from field ID
169    if field_id_lookup:
170      field_name = field_id_lookup.get(field_id)
171      if field_name is None:
172        # The metric id is the md5 hash of the name.  We can miss some, e.g. due
173        # to debug builds.
174        if bad_c:
175          bad_c.writerow((date, field_id, num_reports))
176          num_bad += 1
177        continue
178    else:
179      field_name = field_id
180
181    # NOTE: We could remove the params from the spec if decode_dist.R took the
182    # --schema flag.  The var type is there too.
183    params_path = var_schema.GetParamsPath(field_name)
184    map_path= dist_maps.GetMapPath(field_name)
185
186    yield num_reports, field_name, date, counts_path, params_path, map_path
187
188  util.log('%d unique field IDs', len(unique_ids))
189  if num_bad:
190    util.log('Failed field ID -> field name lookup on %d files '
191             '(check --field-ids file)', num_bad)
192
193
194ASSOC_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_reports.csv')
195
196
197def AssocInputIter(stdin):
198  """Read lines from stdin and extract fields to construct analysis tasks."""
199  for line in stdin:
200    m = ASSOC_INPUT_PATH_RE.match(line)
201    if not m:
202      raise RuntimeError('Invalid path %r' % line)
203
204    reports_path = line.strip()
205    date, metric_name = m.groups()
206
207    yield reports_path, date, metric_name
208
209
210def CreateAssocVarPairs(rappor_metrics):
211  """Yield a list of pairs of variables that should be associated.
212
213  For now just do all (string x boolean) analysis.
214  """
215  var_pairs = collections.defaultdict(list)
216
217  for metric, var_list in rappor_metrics.iteritems():
218    string_vars = []
219    boolean_vars = []
220
221    # Separate variables into strings and booleans
222    for var_name, var_type in var_list:
223      if var_type == 'string':
224        string_vars.append(var_name)
225      elif var_type == 'boolean':
226        boolean_vars.append(var_name)
227      else:
228        util.log('Unknown type variable type %r', var_type)
229
230    for s in string_vars:
231      for b in boolean_vars:
232        var_pairs[metric].append((s, b))
233  return var_pairs
234
235
236# For debugging
237def PrintAssocVarPairs(var_pairs):
238  for metric, var_list in var_pairs.iteritems():
239    print metric
240    for var_name, var_type in var_list:
241      print '\t', var_name, var_type
242
243
244def AssocTaskSpec(input_iter, var_pairs, dist_maps, output_base_dir, bad_c):
245  """Print the task spec for multiple variable RAPPOR to stdout."""
246  # Flow:
247  #
248  # Long term: We should have assoc-analysis.xml, next to dist-analysis.xml?
249  #
250  # Short term: update_rappor.py should print every combination of string vs.
251  # bool?  Or I guess we have it in rappor-vars.csv
252
253  for reports_path, date, metric_name in input_iter:
254    pairs = var_pairs[metric_name]
255    for var1, var2 in pairs:
256      # Assuming var1 is a string.  TODO: Use an assoc file, not dist_maps?
257      field1_name = '%s.%s' % (metric_name, var1)
258      map1_path = dist_maps.GetMapPath(field1_name)
259
260      # e.g. domain_X_flags__DID_PROCEED
261      # Don't use .. in filenames since it could be confusing.
262      pair_name = '%s_X_%s' % (var1, var2.replace('..', '_'))
263      output_dir = os.path.join(output_base_dir, metric_name, pair_name, date)
264
265      yield metric_name, date, reports_path, var1, var2, map1_path, output_dir
266
267
268def CreateOptionsParser():
269  p = optparse.OptionParser()
270
271  p.add_option(
272      '--bad-report-out', dest='bad_report', metavar='PATH', type='str',
273      default='',
274      help='Optionally write a report of input filenames with invalid field '
275           'IDs to this file.')
276  p.add_option(
277      '--config-dir', dest='config_dir', metavar='PATH', type='str',
278      default='',
279      help='Directory with metadata schema and params files to read.')
280  p.add_option(
281      '--map-dir', dest='map_dir', metavar='PATH', type='str',
282      default='',
283      help='Directory with map files to read.')
284  p.add_option(
285      '--output-base-dir', dest='output_base_dir', metavar='PATH', type='str',
286      default='',
287      help='Root of the directory tree where analysis output will be placed.')
288  p.add_option(
289      '--field-ids', dest='field_ids', metavar='PATH', type='str',
290      default='',
291      help='Optional CSV file with field IDs (generally should not be used).')
292
293  return p
294
295
296def main(argv):
297  (opts, argv) = CreateOptionsParser().parse_args(argv)
298
299  if opts.bad_report:
300    bad_f = open(opts.bad_report, 'w')
301    bad_c = csv.writer(bad_f)
302  else:
303    bad_c = None
304
305  action = argv[1]
306
307  if not opts.config_dir:
308    raise RuntimeError('--config-dir is required')
309  if not opts.map_dir:
310    raise RuntimeError('--map-dir is required')
311  if not opts.output_base_dir:
312    raise RuntimeError('--output-base-dir is required')
313
314  # This is shared between the two specs.
315  path = os.path.join(opts.config_dir, 'dist-analysis.csv')
316  with open(path) as f:
317    dist_maps = DistMapLookup(f, opts.map_dir)
318
319  path = os.path.join(opts.config_dir, 'rappor-vars.csv')
320  with open(path) as f:
321    var_schema = VarSchema(f, opts.config_dir)
322
323  if action == 'dist':
324    if opts.field_ids:
325      with open(opts.field_ids) as f:
326        field_id_lookup = CreateFieldIdLookup(f)
327    else:
328      field_id_lookup = {}
329
330    input_iter = DistInputIter(sys.stdin)
331    for row in DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps,
332                            bad_c):
333      # The spec is a series of space-separated tokens.
334      tokens = row + (opts.output_base_dir,)
335      print ' '.join(str(t) for t in tokens)
336
337  elif action == 'assoc':
338    # Parse input
339    input_iter = AssocInputIter(sys.stdin)
340
341    # Create M x N association tasks
342    var_pairs = CreateAssocVarPairs(var_schema.GetAssocMetrics())
343
344    # Now add the other constant stuff
345    for row in AssocTaskSpec(
346        input_iter, var_pairs, dist_maps, opts.output_base_dir, bad_c):
347
348      num_reports = 0  # placeholder, not filtering yet
349      tokens = (num_reports,) + row
350      print ' '.join(str(t) for t in tokens)
351
352  else:
353    raise RuntimeError('Invalid action %r' % action)
354
355
356if __name__ == '__main__':
357  try:
358    main(sys.argv)
359  except IOError, e:
360    if e.errno != errno.EPIPE:  # ignore broken pipe
361      raise
362  except RuntimeError, e:
363    print >>sys.stderr, 'FATAL: %s' % e
364    sys.exit(1)
365