xref: /aosp_15_r20/external/rappor/pipeline/task_spec.py (revision 2abb31345f6c95944768b5222a9a5ed3fc68cc00)
1*2abb3134SXin Li#!/usr/bin/python
2*2abb3134SXin Li"""Read a list of 'counts' paths on stdin, and write a task spec on stdout.
3*2abb3134SXin Li
4*2abb3134SXin LiEach line represents a task, or R process invocation.  The params on each line
5*2abb3134SXin Liare passed to ./dist.sh decode-many or ./assoc.sh decode-many.
6*2abb3134SXin Li"""
7*2abb3134SXin Li
8*2abb3134SXin Liimport collections
9*2abb3134SXin Liimport csv
10*2abb3134SXin Liimport errno
11*2abb3134SXin Liimport optparse
12*2abb3134SXin Liimport os
13*2abb3134SXin Liimport pprint
14*2abb3134SXin Liimport re
15*2abb3134SXin Liimport sys
16*2abb3134SXin Li
17*2abb3134SXin Liimport util
18*2abb3134SXin Li
19*2abb3134SXin Li
20*2abb3134SXin Lidef _ReadDistMaps(f):
21*2abb3134SXin Li  dist_maps = {}
22*2abb3134SXin Li  c = csv.reader(f)
23*2abb3134SXin Li  for i, row in enumerate(c):
24*2abb3134SXin Li    if i == 0:
25*2abb3134SXin Li      expected = ['var', 'map_filename']
26*2abb3134SXin Li      if row != expected:
27*2abb3134SXin Li        raise RuntimeError('Expected CSV header %s' % expected)
28*2abb3134SXin Li      continue  # skip header
29*2abb3134SXin Li
30*2abb3134SXin Li    var_name, map_filename = row
31*2abb3134SXin Li    dist_maps[var_name] = map_filename
32*2abb3134SXin Li  return dist_maps
33*2abb3134SXin Li
34*2abb3134SXin Li
35*2abb3134SXin Liclass DistMapLookup(object):
36*2abb3134SXin Li  """Create a dictionary of var -> map to analyze against.
37*2abb3134SXin Li
38*2abb3134SXin Li  TODO: Support a LIST of maps.  Users should be able to specify more than one.
39*2abb3134SXin Li  """
40*2abb3134SXin Li  def __init__(self, f, map_dir):
41*2abb3134SXin Li    self.dist_maps = _ReadDistMaps(f)
42*2abb3134SXin Li    self.map_dir = map_dir
43*2abb3134SXin Li
44*2abb3134SXin Li  def GetMapPath(self, var_name):
45*2abb3134SXin Li    filename = self.dist_maps[var_name]
46*2abb3134SXin Li    return os.path.join(self.map_dir, filename)
47*2abb3134SXin Li
48*2abb3134SXin Li
49*2abb3134SXin Lidef CreateFieldIdLookup(f):
50*2abb3134SXin Li  """Create a dictionary that specifies single variable analysis each var.
51*2abb3134SXin Li
52*2abb3134SXin Li  Args:
53*2abb3134SXin Li    config_dir: directory of metadata, output by update_rappor.par
54*2abb3134SXin Li
55*2abb3134SXin Li  Returns:
56*2abb3134SXin Li    A dictionary from field ID -> full field name
57*2abb3134SXin Li
58*2abb3134SXin Li  NOTE: Right now we're only doing single variable analysis for strings, so we
59*2abb3134SXin Li  don't have the "type".
60*2abb3134SXin Li  """
61*2abb3134SXin Li  field_id_lookup = {}
62*2abb3134SXin Li  c = csv.reader(f)
63*2abb3134SXin Li  for i, row in enumerate(c):
64*2abb3134SXin Li    if i == 0:
65*2abb3134SXin Li      expected = ['metric', 'field', 'field_type', 'params', 'field_id']
66*2abb3134SXin Li      if row != expected:
67*2abb3134SXin Li        raise RuntimeError('Expected CSV header %s' % expected)
68*2abb3134SXin Li      continue
69*2abb3134SXin Li
70*2abb3134SXin Li    metric, field, field_type, _, field_id = row
71*2abb3134SXin Li
72*2abb3134SXin Li    if field_type != 'string':
73*2abb3134SXin Li      continue
74*2abb3134SXin Li
75*2abb3134SXin Li    # Paper over the difference between plain metrics (single variable) and
76*2abb3134SXin Li    # metrics with fields (multiple variables, for association analysis).
77*2abb3134SXin Li    if field:
78*2abb3134SXin Li      full_field_name = '%s.%s' % (metric, field)
79*2abb3134SXin Li    else:
80*2abb3134SXin Li      full_field_name = metric
81*2abb3134SXin Li
82*2abb3134SXin Li    field_id_lookup[field_id] = full_field_name
83*2abb3134SXin Li  return field_id_lookup
84*2abb3134SXin Li
85*2abb3134SXin Li
86*2abb3134SXin Lidef _ReadVarSchema(f):
87*2abb3134SXin Li  """Given the rappor-vars.csv file, return a list of metric/var/type."""
88*2abb3134SXin Li  # metric -> list of (variable name, type)
89*2abb3134SXin Li  assoc_metrics = collections.defaultdict(list)
90*2abb3134SXin Li  params_lookup = {}
91*2abb3134SXin Li
92*2abb3134SXin Li  c = csv.reader(f)
93*2abb3134SXin Li  for i, row in enumerate(c):
94*2abb3134SXin Li    if i == 0:
95*2abb3134SXin Li      expected = ['metric', 'var', 'var_type', 'params']
96*2abb3134SXin Li      if row != expected:
97*2abb3134SXin Li        raise RuntimeError('Expected CSV header %s, got %s' % (expected, row))
98*2abb3134SXin Li      continue
99*2abb3134SXin Li
100*2abb3134SXin Li    metric, var, var_type, params = row
101*2abb3134SXin Li    if var == '':
102*2abb3134SXin Li      full_var_name = metric
103*2abb3134SXin Li    else:
104*2abb3134SXin Li      full_var_name = '%s.%s' % (metric, var)
105*2abb3134SXin Li      # Also group multi-dimensional reports
106*2abb3134SXin Li      assoc_metrics[metric].append((var, var_type))
107*2abb3134SXin Li
108*2abb3134SXin Li    params_lookup[full_var_name] = params
109*2abb3134SXin Li
110*2abb3134SXin Li  return assoc_metrics, params_lookup
111*2abb3134SXin Li
112*2abb3134SXin Li
113*2abb3134SXin Liclass VarSchema(object):
114*2abb3134SXin Li  """Object representing rappor-vars.csv.
115*2abb3134SXin Li
116*2abb3134SXin Li  Right now we use it for slightly different purposes for dist and assoc
117*2abb3134SXin Li  analysis.
118*2abb3134SXin Li  """
119*2abb3134SXin Li  def __init__(self, f, params_dir):
120*2abb3134SXin Li    self.assoc_metrics, self.params_lookup = _ReadVarSchema(f)
121*2abb3134SXin Li    self.params_dir = params_dir
122*2abb3134SXin Li
123*2abb3134SXin Li  def GetParamsPath(self, var_name):
124*2abb3134SXin Li    filename = self.params_lookup[var_name]
125*2abb3134SXin Li    return os.path.join(self.params_dir, filename + '.csv')
126*2abb3134SXin Li
127*2abb3134SXin Li  def GetAssocMetrics(self):
128*2abb3134SXin Li    return self.assoc_metrics
129*2abb3134SXin Li
130*2abb3134SXin Li
131*2abb3134SXin Lidef CountReports(f):
132*2abb3134SXin Li  num_reports = 0
133*2abb3134SXin Li  for line in f:
134*2abb3134SXin Li    first_col = line.split(',')[0]
135*2abb3134SXin Li    num_reports += int(first_col)
136*2abb3134SXin Li  return num_reports
137*2abb3134SXin Li
138*2abb3134SXin Li
139*2abb3134SXin LiDIST_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_counts.csv')
140*2abb3134SXin Li
141*2abb3134SXin Li
142*2abb3134SXin Lidef DistInputIter(stdin):
143*2abb3134SXin Li  """Read lines from stdin and extract fields to construct analysis tasks."""
144*2abb3134SXin Li  for line in stdin:
145*2abb3134SXin Li    m = DIST_INPUT_PATH_RE.match(line)
146*2abb3134SXin Li    if not m:
147*2abb3134SXin Li      raise RuntimeError('Invalid path %r' % line)
148*2abb3134SXin Li
149*2abb3134SXin Li    counts_path = line.strip()
150*2abb3134SXin Li    date, field_id = m.groups()
151*2abb3134SXin Li
152*2abb3134SXin Li    yield counts_path, date, field_id
153*2abb3134SXin Li
154*2abb3134SXin Li
155*2abb3134SXin Lidef DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, bad_c):
156*2abb3134SXin Li  """Print task spec for single variable RAPPOR to stdout."""
157*2abb3134SXin Li
158*2abb3134SXin Li  num_bad = 0
159*2abb3134SXin Li  unique_ids = set()
160*2abb3134SXin Li
161*2abb3134SXin Li  for counts_path, date, field_id in input_iter:
162*2abb3134SXin Li    unique_ids.add(field_id)
163*2abb3134SXin Li
164*2abb3134SXin Li    # num_reports is used for filtering
165*2abb3134SXin Li    with open(counts_path) as f:
166*2abb3134SXin Li      num_reports = CountReports(f)
167*2abb3134SXin Li
168*2abb3134SXin Li    # Look up field name from field ID
169*2abb3134SXin Li    if field_id_lookup:
170*2abb3134SXin Li      field_name = field_id_lookup.get(field_id)
171*2abb3134SXin Li      if field_name is None:
172*2abb3134SXin Li        # The metric id is the md5 hash of the name.  We can miss some, e.g. due
173*2abb3134SXin Li        # to debug builds.
174*2abb3134SXin Li        if bad_c:
175*2abb3134SXin Li          bad_c.writerow((date, field_id, num_reports))
176*2abb3134SXin Li          num_bad += 1
177*2abb3134SXin Li        continue
178*2abb3134SXin Li    else:
179*2abb3134SXin Li      field_name = field_id
180*2abb3134SXin Li
181*2abb3134SXin Li    # NOTE: We could remove the params from the spec if decode_dist.R took the
182*2abb3134SXin Li    # --schema flag.  The var type is there too.
183*2abb3134SXin Li    params_path = var_schema.GetParamsPath(field_name)
184*2abb3134SXin Li    map_path= dist_maps.GetMapPath(field_name)
185*2abb3134SXin Li
186*2abb3134SXin Li    yield num_reports, field_name, date, counts_path, params_path, map_path
187*2abb3134SXin Li
188*2abb3134SXin Li  util.log('%d unique field IDs', len(unique_ids))
189*2abb3134SXin Li  if num_bad:
190*2abb3134SXin Li    util.log('Failed field ID -> field name lookup on %d files '
191*2abb3134SXin Li             '(check --field-ids file)', num_bad)
192*2abb3134SXin Li
193*2abb3134SXin Li
194*2abb3134SXin LiASSOC_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_reports.csv')
195*2abb3134SXin Li
196*2abb3134SXin Li
197*2abb3134SXin Lidef AssocInputIter(stdin):
198*2abb3134SXin Li  """Read lines from stdin and extract fields to construct analysis tasks."""
199*2abb3134SXin Li  for line in stdin:
200*2abb3134SXin Li    m = ASSOC_INPUT_PATH_RE.match(line)
201*2abb3134SXin Li    if not m:
202*2abb3134SXin Li      raise RuntimeError('Invalid path %r' % line)
203*2abb3134SXin Li
204*2abb3134SXin Li    reports_path = line.strip()
205*2abb3134SXin Li    date, metric_name = m.groups()
206*2abb3134SXin Li
207*2abb3134SXin Li    yield reports_path, date, metric_name
208*2abb3134SXin Li
209*2abb3134SXin Li
210*2abb3134SXin Lidef CreateAssocVarPairs(rappor_metrics):
211*2abb3134SXin Li  """Yield a list of pairs of variables that should be associated.
212*2abb3134SXin Li
213*2abb3134SXin Li  For now just do all (string x boolean) analysis.
214*2abb3134SXin Li  """
215*2abb3134SXin Li  var_pairs = collections.defaultdict(list)
216*2abb3134SXin Li
217*2abb3134SXin Li  for metric, var_list in rappor_metrics.iteritems():
218*2abb3134SXin Li    string_vars = []
219*2abb3134SXin Li    boolean_vars = []
220*2abb3134SXin Li
221*2abb3134SXin Li    # Separate variables into strings and booleans
222*2abb3134SXin Li    for var_name, var_type in var_list:
223*2abb3134SXin Li      if var_type == 'string':
224*2abb3134SXin Li        string_vars.append(var_name)
225*2abb3134SXin Li      elif var_type == 'boolean':
226*2abb3134SXin Li        boolean_vars.append(var_name)
227*2abb3134SXin Li      else:
228*2abb3134SXin Li        util.log('Unknown type variable type %r', var_type)
229*2abb3134SXin Li
230*2abb3134SXin Li    for s in string_vars:
231*2abb3134SXin Li      for b in boolean_vars:
232*2abb3134SXin Li        var_pairs[metric].append((s, b))
233*2abb3134SXin Li  return var_pairs
234*2abb3134SXin Li
235*2abb3134SXin Li
236*2abb3134SXin Li# For debugging
237*2abb3134SXin Lidef PrintAssocVarPairs(var_pairs):
238*2abb3134SXin Li  for metric, var_list in var_pairs.iteritems():
239*2abb3134SXin Li    print metric
240*2abb3134SXin Li    for var_name, var_type in var_list:
241*2abb3134SXin Li      print '\t', var_name, var_type
242*2abb3134SXin Li
243*2abb3134SXin Li
244*2abb3134SXin Lidef AssocTaskSpec(input_iter, var_pairs, dist_maps, output_base_dir, bad_c):
245*2abb3134SXin Li  """Print the task spec for multiple variable RAPPOR to stdout."""
246*2abb3134SXin Li  # Flow:
247*2abb3134SXin Li  #
248*2abb3134SXin Li  # Long term: We should have assoc-analysis.xml, next to dist-analysis.xml?
249*2abb3134SXin Li  #
250*2abb3134SXin Li  # Short term: update_rappor.py should print every combination of string vs.
251*2abb3134SXin Li  # bool?  Or I guess we have it in rappor-vars.csv
252*2abb3134SXin Li
253*2abb3134SXin Li  for reports_path, date, metric_name in input_iter:
254*2abb3134SXin Li    pairs = var_pairs[metric_name]
255*2abb3134SXin Li    for var1, var2 in pairs:
256*2abb3134SXin Li      # Assuming var1 is a string.  TODO: Use an assoc file, not dist_maps?
257*2abb3134SXin Li      field1_name = '%s.%s' % (metric_name, var1)
258*2abb3134SXin Li      map1_path = dist_maps.GetMapPath(field1_name)
259*2abb3134SXin Li
260*2abb3134SXin Li      # e.g. domain_X_flags__DID_PROCEED
261*2abb3134SXin Li      # Don't use .. in filenames since it could be confusing.
262*2abb3134SXin Li      pair_name = '%s_X_%s' % (var1, var2.replace('..', '_'))
263*2abb3134SXin Li      output_dir = os.path.join(output_base_dir, metric_name, pair_name, date)
264*2abb3134SXin Li
265*2abb3134SXin Li      yield metric_name, date, reports_path, var1, var2, map1_path, output_dir
266*2abb3134SXin Li
267*2abb3134SXin Li
268*2abb3134SXin Lidef CreateOptionsParser():
269*2abb3134SXin Li  p = optparse.OptionParser()
270*2abb3134SXin Li
271*2abb3134SXin Li  p.add_option(
272*2abb3134SXin Li      '--bad-report-out', dest='bad_report', metavar='PATH', type='str',
273*2abb3134SXin Li      default='',
274*2abb3134SXin Li      help='Optionally write a report of input filenames with invalid field '
275*2abb3134SXin Li           'IDs to this file.')
276*2abb3134SXin Li  p.add_option(
277*2abb3134SXin Li      '--config-dir', dest='config_dir', metavar='PATH', type='str',
278*2abb3134SXin Li      default='',
279*2abb3134SXin Li      help='Directory with metadata schema and params files to read.')
280*2abb3134SXin Li  p.add_option(
281*2abb3134SXin Li      '--map-dir', dest='map_dir', metavar='PATH', type='str',
282*2abb3134SXin Li      default='',
283*2abb3134SXin Li      help='Directory with map files to read.')
284*2abb3134SXin Li  p.add_option(
285*2abb3134SXin Li      '--output-base-dir', dest='output_base_dir', metavar='PATH', type='str',
286*2abb3134SXin Li      default='',
287*2abb3134SXin Li      help='Root of the directory tree where analysis output will be placed.')
288*2abb3134SXin Li  p.add_option(
289*2abb3134SXin Li      '--field-ids', dest='field_ids', metavar='PATH', type='str',
290*2abb3134SXin Li      default='',
291*2abb3134SXin Li      help='Optional CSV file with field IDs (generally should not be used).')
292*2abb3134SXin Li
293*2abb3134SXin Li  return p
294*2abb3134SXin Li
295*2abb3134SXin Li
296*2abb3134SXin Lidef main(argv):
297*2abb3134SXin Li  (opts, argv) = CreateOptionsParser().parse_args(argv)
298*2abb3134SXin Li
299*2abb3134SXin Li  if opts.bad_report:
300*2abb3134SXin Li    bad_f = open(opts.bad_report, 'w')
301*2abb3134SXin Li    bad_c = csv.writer(bad_f)
302*2abb3134SXin Li  else:
303*2abb3134SXin Li    bad_c = None
304*2abb3134SXin Li
305*2abb3134SXin Li  action = argv[1]
306*2abb3134SXin Li
307*2abb3134SXin Li  if not opts.config_dir:
308*2abb3134SXin Li    raise RuntimeError('--config-dir is required')
309*2abb3134SXin Li  if not opts.map_dir:
310*2abb3134SXin Li    raise RuntimeError('--map-dir is required')
311*2abb3134SXin Li  if not opts.output_base_dir:
312*2abb3134SXin Li    raise RuntimeError('--output-base-dir is required')
313*2abb3134SXin Li
314*2abb3134SXin Li  # This is shared between the two specs.
315*2abb3134SXin Li  path = os.path.join(opts.config_dir, 'dist-analysis.csv')
316*2abb3134SXin Li  with open(path) as f:
317*2abb3134SXin Li    dist_maps = DistMapLookup(f, opts.map_dir)
318*2abb3134SXin Li
319*2abb3134SXin Li  path = os.path.join(opts.config_dir, 'rappor-vars.csv')
320*2abb3134SXin Li  with open(path) as f:
321*2abb3134SXin Li    var_schema = VarSchema(f, opts.config_dir)
322*2abb3134SXin Li
323*2abb3134SXin Li  if action == 'dist':
324*2abb3134SXin Li    if opts.field_ids:
325*2abb3134SXin Li      with open(opts.field_ids) as f:
326*2abb3134SXin Li        field_id_lookup = CreateFieldIdLookup(f)
327*2abb3134SXin Li    else:
328*2abb3134SXin Li      field_id_lookup = {}
329*2abb3134SXin Li
330*2abb3134SXin Li    input_iter = DistInputIter(sys.stdin)
331*2abb3134SXin Li    for row in DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps,
332*2abb3134SXin Li                            bad_c):
333*2abb3134SXin Li      # The spec is a series of space-separated tokens.
334*2abb3134SXin Li      tokens = row + (opts.output_base_dir,)
335*2abb3134SXin Li      print ' '.join(str(t) for t in tokens)
336*2abb3134SXin Li
337*2abb3134SXin Li  elif action == 'assoc':
338*2abb3134SXin Li    # Parse input
339*2abb3134SXin Li    input_iter = AssocInputIter(sys.stdin)
340*2abb3134SXin Li
341*2abb3134SXin Li    # Create M x N association tasks
342*2abb3134SXin Li    var_pairs = CreateAssocVarPairs(var_schema.GetAssocMetrics())
343*2abb3134SXin Li
344*2abb3134SXin Li    # Now add the other constant stuff
345*2abb3134SXin Li    for row in AssocTaskSpec(
346*2abb3134SXin Li        input_iter, var_pairs, dist_maps, opts.output_base_dir, bad_c):
347*2abb3134SXin Li
348*2abb3134SXin Li      num_reports = 0  # placeholder, not filtering yet
349*2abb3134SXin Li      tokens = (num_reports,) + row
350*2abb3134SXin Li      print ' '.join(str(t) for t in tokens)
351*2abb3134SXin Li
352*2abb3134SXin Li  else:
353*2abb3134SXin Li    raise RuntimeError('Invalid action %r' % action)
354*2abb3134SXin Li
355*2abb3134SXin Li
356*2abb3134SXin Liif __name__ == '__main__':
357*2abb3134SXin Li  try:
358*2abb3134SXin Li    main(sys.argv)
359*2abb3134SXin Li  except IOError, e:
360*2abb3134SXin Li    if e.errno != errno.EPIPE:  # ignore broken pipe
361*2abb3134SXin Li      raise
362*2abb3134SXin Li  except RuntimeError, e:
363*2abb3134SXin Li    print >>sys.stderr, 'FATAL: %s' % e
364*2abb3134SXin Li    sys.exit(1)
365