#!/usr/bin/python """Read a list of 'counts' paths on stdin, and write a task spec on stdout. Each line represents a task, or R process invocation. The params on each line are passed to ./dist.sh decode-many or ./assoc.sh decode-many. """ import collections import csv import errno import optparse import os import pprint import re import sys import util def _ReadDistMaps(f): dist_maps = {} c = csv.reader(f) for i, row in enumerate(c): if i == 0: expected = ['var', 'map_filename'] if row != expected: raise RuntimeError('Expected CSV header %s' % expected) continue # skip header var_name, map_filename = row dist_maps[var_name] = map_filename return dist_maps class DistMapLookup(object): """Create a dictionary of var -> map to analyze against. TODO: Support a LIST of maps. Users should be able to specify more than one. """ def __init__(self, f, map_dir): self.dist_maps = _ReadDistMaps(f) self.map_dir = map_dir def GetMapPath(self, var_name): filename = self.dist_maps[var_name] return os.path.join(self.map_dir, filename) def CreateFieldIdLookup(f): """Create a dictionary that specifies single variable analysis each var. Args: config_dir: directory of metadata, output by update_rappor.par Returns: A dictionary from field ID -> full field name NOTE: Right now we're only doing single variable analysis for strings, so we don't have the "type". """ field_id_lookup = {} c = csv.reader(f) for i, row in enumerate(c): if i == 0: expected = ['metric', 'field', 'field_type', 'params', 'field_id'] if row != expected: raise RuntimeError('Expected CSV header %s' % expected) continue metric, field, field_type, _, field_id = row if field_type != 'string': continue # Paper over the difference between plain metrics (single variable) and # metrics with fields (multiple variables, for association analysis). if field: full_field_name = '%s.%s' % (metric, field) else: full_field_name = metric field_id_lookup[field_id] = full_field_name return field_id_lookup def _ReadVarSchema(f): """Given the rappor-vars.csv file, return a list of metric/var/type.""" # metric -> list of (variable name, type) assoc_metrics = collections.defaultdict(list) params_lookup = {} c = csv.reader(f) for i, row in enumerate(c): if i == 0: expected = ['metric', 'var', 'var_type', 'params'] if row != expected: raise RuntimeError('Expected CSV header %s, got %s' % (expected, row)) continue metric, var, var_type, params = row if var == '': full_var_name = metric else: full_var_name = '%s.%s' % (metric, var) # Also group multi-dimensional reports assoc_metrics[metric].append((var, var_type)) params_lookup[full_var_name] = params return assoc_metrics, params_lookup class VarSchema(object): """Object representing rappor-vars.csv. Right now we use it for slightly different purposes for dist and assoc analysis. """ def __init__(self, f, params_dir): self.assoc_metrics, self.params_lookup = _ReadVarSchema(f) self.params_dir = params_dir def GetParamsPath(self, var_name): filename = self.params_lookup[var_name] return os.path.join(self.params_dir, filename + '.csv') def GetAssocMetrics(self): return self.assoc_metrics def CountReports(f): num_reports = 0 for line in f: first_col = line.split(',')[0] num_reports += int(first_col) return num_reports DIST_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_counts.csv') def DistInputIter(stdin): """Read lines from stdin and extract fields to construct analysis tasks.""" for line in stdin: m = DIST_INPUT_PATH_RE.match(line) if not m: raise RuntimeError('Invalid path %r' % line) counts_path = line.strip() date, field_id = m.groups() yield counts_path, date, field_id def DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, bad_c): """Print task spec for single variable RAPPOR to stdout.""" num_bad = 0 unique_ids = set() for counts_path, date, field_id in input_iter: unique_ids.add(field_id) # num_reports is used for filtering with open(counts_path) as f: num_reports = CountReports(f) # Look up field name from field ID if field_id_lookup: field_name = field_id_lookup.get(field_id) if field_name is None: # The metric id is the md5 hash of the name. We can miss some, e.g. due # to debug builds. if bad_c: bad_c.writerow((date, field_id, num_reports)) num_bad += 1 continue else: field_name = field_id # NOTE: We could remove the params from the spec if decode_dist.R took the # --schema flag. The var type is there too. params_path = var_schema.GetParamsPath(field_name) map_path= dist_maps.GetMapPath(field_name) yield num_reports, field_name, date, counts_path, params_path, map_path util.log('%d unique field IDs', len(unique_ids)) if num_bad: util.log('Failed field ID -> field name lookup on %d files ' '(check --field-ids file)', num_bad) ASSOC_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_reports.csv') def AssocInputIter(stdin): """Read lines from stdin and extract fields to construct analysis tasks.""" for line in stdin: m = ASSOC_INPUT_PATH_RE.match(line) if not m: raise RuntimeError('Invalid path %r' % line) reports_path = line.strip() date, metric_name = m.groups() yield reports_path, date, metric_name def CreateAssocVarPairs(rappor_metrics): """Yield a list of pairs of variables that should be associated. For now just do all (string x boolean) analysis. """ var_pairs = collections.defaultdict(list) for metric, var_list in rappor_metrics.iteritems(): string_vars = [] boolean_vars = [] # Separate variables into strings and booleans for var_name, var_type in var_list: if var_type == 'string': string_vars.append(var_name) elif var_type == 'boolean': boolean_vars.append(var_name) else: util.log('Unknown type variable type %r', var_type) for s in string_vars: for b in boolean_vars: var_pairs[metric].append((s, b)) return var_pairs # For debugging def PrintAssocVarPairs(var_pairs): for metric, var_list in var_pairs.iteritems(): print metric for var_name, var_type in var_list: print '\t', var_name, var_type def AssocTaskSpec(input_iter, var_pairs, dist_maps, output_base_dir, bad_c): """Print the task spec for multiple variable RAPPOR to stdout.""" # Flow: # # Long term: We should have assoc-analysis.xml, next to dist-analysis.xml? # # Short term: update_rappor.py should print every combination of string vs. # bool? Or I guess we have it in rappor-vars.csv for reports_path, date, metric_name in input_iter: pairs = var_pairs[metric_name] for var1, var2 in pairs: # Assuming var1 is a string. TODO: Use an assoc file, not dist_maps? field1_name = '%s.%s' % (metric_name, var1) map1_path = dist_maps.GetMapPath(field1_name) # e.g. domain_X_flags__DID_PROCEED # Don't use .. in filenames since it could be confusing. pair_name = '%s_X_%s' % (var1, var2.replace('..', '_')) output_dir = os.path.join(output_base_dir, metric_name, pair_name, date) yield metric_name, date, reports_path, var1, var2, map1_path, output_dir def CreateOptionsParser(): p = optparse.OptionParser() p.add_option( '--bad-report-out', dest='bad_report', metavar='PATH', type='str', default='', help='Optionally write a report of input filenames with invalid field ' 'IDs to this file.') p.add_option( '--config-dir', dest='config_dir', metavar='PATH', type='str', default='', help='Directory with metadata schema and params files to read.') p.add_option( '--map-dir', dest='map_dir', metavar='PATH', type='str', default='', help='Directory with map files to read.') p.add_option( '--output-base-dir', dest='output_base_dir', metavar='PATH', type='str', default='', help='Root of the directory tree where analysis output will be placed.') p.add_option( '--field-ids', dest='field_ids', metavar='PATH', type='str', default='', help='Optional CSV file with field IDs (generally should not be used).') return p def main(argv): (opts, argv) = CreateOptionsParser().parse_args(argv) if opts.bad_report: bad_f = open(opts.bad_report, 'w') bad_c = csv.writer(bad_f) else: bad_c = None action = argv[1] if not opts.config_dir: raise RuntimeError('--config-dir is required') if not opts.map_dir: raise RuntimeError('--map-dir is required') if not opts.output_base_dir: raise RuntimeError('--output-base-dir is required') # This is shared between the two specs. path = os.path.join(opts.config_dir, 'dist-analysis.csv') with open(path) as f: dist_maps = DistMapLookup(f, opts.map_dir) path = os.path.join(opts.config_dir, 'rappor-vars.csv') with open(path) as f: var_schema = VarSchema(f, opts.config_dir) if action == 'dist': if opts.field_ids: with open(opts.field_ids) as f: field_id_lookup = CreateFieldIdLookup(f) else: field_id_lookup = {} input_iter = DistInputIter(sys.stdin) for row in DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, bad_c): # The spec is a series of space-separated tokens. tokens = row + (opts.output_base_dir,) print ' '.join(str(t) for t in tokens) elif action == 'assoc': # Parse input input_iter = AssocInputIter(sys.stdin) # Create M x N association tasks var_pairs = CreateAssocVarPairs(var_schema.GetAssocMetrics()) # Now add the other constant stuff for row in AssocTaskSpec( input_iter, var_pairs, dist_maps, opts.output_base_dir, bad_c): num_reports = 0 # placeholder, not filtering yet tokens = (num_reports,) + row print ' '.join(str(t) for t in tokens) else: raise RuntimeError('Invalid action %r' % action) if __name__ == '__main__': try: main(sys.argv) except IOError, e: if e.errno != errno.EPIPE: # ignore broken pipe raise except RuntimeError, e: print >>sys.stderr, 'FATAL: %s' % e sys.exit(1)