1*2abb3134SXin Li#!/usr/bin/python 2*2abb3134SXin Li"""Read a list of 'counts' paths on stdin, and write a task spec on stdout. 3*2abb3134SXin Li 4*2abb3134SXin LiEach line represents a task, or R process invocation. The params on each line 5*2abb3134SXin Liare passed to ./dist.sh decode-many or ./assoc.sh decode-many. 6*2abb3134SXin Li""" 7*2abb3134SXin Li 8*2abb3134SXin Liimport collections 9*2abb3134SXin Liimport csv 10*2abb3134SXin Liimport errno 11*2abb3134SXin Liimport optparse 12*2abb3134SXin Liimport os 13*2abb3134SXin Liimport pprint 14*2abb3134SXin Liimport re 15*2abb3134SXin Liimport sys 16*2abb3134SXin Li 17*2abb3134SXin Liimport util 18*2abb3134SXin Li 19*2abb3134SXin Li 20*2abb3134SXin Lidef _ReadDistMaps(f): 21*2abb3134SXin Li dist_maps = {} 22*2abb3134SXin Li c = csv.reader(f) 23*2abb3134SXin Li for i, row in enumerate(c): 24*2abb3134SXin Li if i == 0: 25*2abb3134SXin Li expected = ['var', 'map_filename'] 26*2abb3134SXin Li if row != expected: 27*2abb3134SXin Li raise RuntimeError('Expected CSV header %s' % expected) 28*2abb3134SXin Li continue # skip header 29*2abb3134SXin Li 30*2abb3134SXin Li var_name, map_filename = row 31*2abb3134SXin Li dist_maps[var_name] = map_filename 32*2abb3134SXin Li return dist_maps 33*2abb3134SXin Li 34*2abb3134SXin Li 35*2abb3134SXin Liclass DistMapLookup(object): 36*2abb3134SXin Li """Create a dictionary of var -> map to analyze against. 37*2abb3134SXin Li 38*2abb3134SXin Li TODO: Support a LIST of maps. Users should be able to specify more than one. 39*2abb3134SXin Li """ 40*2abb3134SXin Li def __init__(self, f, map_dir): 41*2abb3134SXin Li self.dist_maps = _ReadDistMaps(f) 42*2abb3134SXin Li self.map_dir = map_dir 43*2abb3134SXin Li 44*2abb3134SXin Li def GetMapPath(self, var_name): 45*2abb3134SXin Li filename = self.dist_maps[var_name] 46*2abb3134SXin Li return os.path.join(self.map_dir, filename) 47*2abb3134SXin Li 48*2abb3134SXin Li 49*2abb3134SXin Lidef CreateFieldIdLookup(f): 50*2abb3134SXin Li """Create a dictionary that specifies single variable analysis each var. 51*2abb3134SXin Li 52*2abb3134SXin Li Args: 53*2abb3134SXin Li config_dir: directory of metadata, output by update_rappor.par 54*2abb3134SXin Li 55*2abb3134SXin Li Returns: 56*2abb3134SXin Li A dictionary from field ID -> full field name 57*2abb3134SXin Li 58*2abb3134SXin Li NOTE: Right now we're only doing single variable analysis for strings, so we 59*2abb3134SXin Li don't have the "type". 60*2abb3134SXin Li """ 61*2abb3134SXin Li field_id_lookup = {} 62*2abb3134SXin Li c = csv.reader(f) 63*2abb3134SXin Li for i, row in enumerate(c): 64*2abb3134SXin Li if i == 0: 65*2abb3134SXin Li expected = ['metric', 'field', 'field_type', 'params', 'field_id'] 66*2abb3134SXin Li if row != expected: 67*2abb3134SXin Li raise RuntimeError('Expected CSV header %s' % expected) 68*2abb3134SXin Li continue 69*2abb3134SXin Li 70*2abb3134SXin Li metric, field, field_type, _, field_id = row 71*2abb3134SXin Li 72*2abb3134SXin Li if field_type != 'string': 73*2abb3134SXin Li continue 74*2abb3134SXin Li 75*2abb3134SXin Li # Paper over the difference between plain metrics (single variable) and 76*2abb3134SXin Li # metrics with fields (multiple variables, for association analysis). 77*2abb3134SXin Li if field: 78*2abb3134SXin Li full_field_name = '%s.%s' % (metric, field) 79*2abb3134SXin Li else: 80*2abb3134SXin Li full_field_name = metric 81*2abb3134SXin Li 82*2abb3134SXin Li field_id_lookup[field_id] = full_field_name 83*2abb3134SXin Li return field_id_lookup 84*2abb3134SXin Li 85*2abb3134SXin Li 86*2abb3134SXin Lidef _ReadVarSchema(f): 87*2abb3134SXin Li """Given the rappor-vars.csv file, return a list of metric/var/type.""" 88*2abb3134SXin Li # metric -> list of (variable name, type) 89*2abb3134SXin Li assoc_metrics = collections.defaultdict(list) 90*2abb3134SXin Li params_lookup = {} 91*2abb3134SXin Li 92*2abb3134SXin Li c = csv.reader(f) 93*2abb3134SXin Li for i, row in enumerate(c): 94*2abb3134SXin Li if i == 0: 95*2abb3134SXin Li expected = ['metric', 'var', 'var_type', 'params'] 96*2abb3134SXin Li if row != expected: 97*2abb3134SXin Li raise RuntimeError('Expected CSV header %s, got %s' % (expected, row)) 98*2abb3134SXin Li continue 99*2abb3134SXin Li 100*2abb3134SXin Li metric, var, var_type, params = row 101*2abb3134SXin Li if var == '': 102*2abb3134SXin Li full_var_name = metric 103*2abb3134SXin Li else: 104*2abb3134SXin Li full_var_name = '%s.%s' % (metric, var) 105*2abb3134SXin Li # Also group multi-dimensional reports 106*2abb3134SXin Li assoc_metrics[metric].append((var, var_type)) 107*2abb3134SXin Li 108*2abb3134SXin Li params_lookup[full_var_name] = params 109*2abb3134SXin Li 110*2abb3134SXin Li return assoc_metrics, params_lookup 111*2abb3134SXin Li 112*2abb3134SXin Li 113*2abb3134SXin Liclass VarSchema(object): 114*2abb3134SXin Li """Object representing rappor-vars.csv. 115*2abb3134SXin Li 116*2abb3134SXin Li Right now we use it for slightly different purposes for dist and assoc 117*2abb3134SXin Li analysis. 118*2abb3134SXin Li """ 119*2abb3134SXin Li def __init__(self, f, params_dir): 120*2abb3134SXin Li self.assoc_metrics, self.params_lookup = _ReadVarSchema(f) 121*2abb3134SXin Li self.params_dir = params_dir 122*2abb3134SXin Li 123*2abb3134SXin Li def GetParamsPath(self, var_name): 124*2abb3134SXin Li filename = self.params_lookup[var_name] 125*2abb3134SXin Li return os.path.join(self.params_dir, filename + '.csv') 126*2abb3134SXin Li 127*2abb3134SXin Li def GetAssocMetrics(self): 128*2abb3134SXin Li return self.assoc_metrics 129*2abb3134SXin Li 130*2abb3134SXin Li 131*2abb3134SXin Lidef CountReports(f): 132*2abb3134SXin Li num_reports = 0 133*2abb3134SXin Li for line in f: 134*2abb3134SXin Li first_col = line.split(',')[0] 135*2abb3134SXin Li num_reports += int(first_col) 136*2abb3134SXin Li return num_reports 137*2abb3134SXin Li 138*2abb3134SXin Li 139*2abb3134SXin LiDIST_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_counts.csv') 140*2abb3134SXin Li 141*2abb3134SXin Li 142*2abb3134SXin Lidef DistInputIter(stdin): 143*2abb3134SXin Li """Read lines from stdin and extract fields to construct analysis tasks.""" 144*2abb3134SXin Li for line in stdin: 145*2abb3134SXin Li m = DIST_INPUT_PATH_RE.match(line) 146*2abb3134SXin Li if not m: 147*2abb3134SXin Li raise RuntimeError('Invalid path %r' % line) 148*2abb3134SXin Li 149*2abb3134SXin Li counts_path = line.strip() 150*2abb3134SXin Li date, field_id = m.groups() 151*2abb3134SXin Li 152*2abb3134SXin Li yield counts_path, date, field_id 153*2abb3134SXin Li 154*2abb3134SXin Li 155*2abb3134SXin Lidef DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, bad_c): 156*2abb3134SXin Li """Print task spec for single variable RAPPOR to stdout.""" 157*2abb3134SXin Li 158*2abb3134SXin Li num_bad = 0 159*2abb3134SXin Li unique_ids = set() 160*2abb3134SXin Li 161*2abb3134SXin Li for counts_path, date, field_id in input_iter: 162*2abb3134SXin Li unique_ids.add(field_id) 163*2abb3134SXin Li 164*2abb3134SXin Li # num_reports is used for filtering 165*2abb3134SXin Li with open(counts_path) as f: 166*2abb3134SXin Li num_reports = CountReports(f) 167*2abb3134SXin Li 168*2abb3134SXin Li # Look up field name from field ID 169*2abb3134SXin Li if field_id_lookup: 170*2abb3134SXin Li field_name = field_id_lookup.get(field_id) 171*2abb3134SXin Li if field_name is None: 172*2abb3134SXin Li # The metric id is the md5 hash of the name. We can miss some, e.g. due 173*2abb3134SXin Li # to debug builds. 174*2abb3134SXin Li if bad_c: 175*2abb3134SXin Li bad_c.writerow((date, field_id, num_reports)) 176*2abb3134SXin Li num_bad += 1 177*2abb3134SXin Li continue 178*2abb3134SXin Li else: 179*2abb3134SXin Li field_name = field_id 180*2abb3134SXin Li 181*2abb3134SXin Li # NOTE: We could remove the params from the spec if decode_dist.R took the 182*2abb3134SXin Li # --schema flag. The var type is there too. 183*2abb3134SXin Li params_path = var_schema.GetParamsPath(field_name) 184*2abb3134SXin Li map_path= dist_maps.GetMapPath(field_name) 185*2abb3134SXin Li 186*2abb3134SXin Li yield num_reports, field_name, date, counts_path, params_path, map_path 187*2abb3134SXin Li 188*2abb3134SXin Li util.log('%d unique field IDs', len(unique_ids)) 189*2abb3134SXin Li if num_bad: 190*2abb3134SXin Li util.log('Failed field ID -> field name lookup on %d files ' 191*2abb3134SXin Li '(check --field-ids file)', num_bad) 192*2abb3134SXin Li 193*2abb3134SXin Li 194*2abb3134SXin LiASSOC_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_reports.csv') 195*2abb3134SXin Li 196*2abb3134SXin Li 197*2abb3134SXin Lidef AssocInputIter(stdin): 198*2abb3134SXin Li """Read lines from stdin and extract fields to construct analysis tasks.""" 199*2abb3134SXin Li for line in stdin: 200*2abb3134SXin Li m = ASSOC_INPUT_PATH_RE.match(line) 201*2abb3134SXin Li if not m: 202*2abb3134SXin Li raise RuntimeError('Invalid path %r' % line) 203*2abb3134SXin Li 204*2abb3134SXin Li reports_path = line.strip() 205*2abb3134SXin Li date, metric_name = m.groups() 206*2abb3134SXin Li 207*2abb3134SXin Li yield reports_path, date, metric_name 208*2abb3134SXin Li 209*2abb3134SXin Li 210*2abb3134SXin Lidef CreateAssocVarPairs(rappor_metrics): 211*2abb3134SXin Li """Yield a list of pairs of variables that should be associated. 212*2abb3134SXin Li 213*2abb3134SXin Li For now just do all (string x boolean) analysis. 214*2abb3134SXin Li """ 215*2abb3134SXin Li var_pairs = collections.defaultdict(list) 216*2abb3134SXin Li 217*2abb3134SXin Li for metric, var_list in rappor_metrics.iteritems(): 218*2abb3134SXin Li string_vars = [] 219*2abb3134SXin Li boolean_vars = [] 220*2abb3134SXin Li 221*2abb3134SXin Li # Separate variables into strings and booleans 222*2abb3134SXin Li for var_name, var_type in var_list: 223*2abb3134SXin Li if var_type == 'string': 224*2abb3134SXin Li string_vars.append(var_name) 225*2abb3134SXin Li elif var_type == 'boolean': 226*2abb3134SXin Li boolean_vars.append(var_name) 227*2abb3134SXin Li else: 228*2abb3134SXin Li util.log('Unknown type variable type %r', var_type) 229*2abb3134SXin Li 230*2abb3134SXin Li for s in string_vars: 231*2abb3134SXin Li for b in boolean_vars: 232*2abb3134SXin Li var_pairs[metric].append((s, b)) 233*2abb3134SXin Li return var_pairs 234*2abb3134SXin Li 235*2abb3134SXin Li 236*2abb3134SXin Li# For debugging 237*2abb3134SXin Lidef PrintAssocVarPairs(var_pairs): 238*2abb3134SXin Li for metric, var_list in var_pairs.iteritems(): 239*2abb3134SXin Li print metric 240*2abb3134SXin Li for var_name, var_type in var_list: 241*2abb3134SXin Li print '\t', var_name, var_type 242*2abb3134SXin Li 243*2abb3134SXin Li 244*2abb3134SXin Lidef AssocTaskSpec(input_iter, var_pairs, dist_maps, output_base_dir, bad_c): 245*2abb3134SXin Li """Print the task spec for multiple variable RAPPOR to stdout.""" 246*2abb3134SXin Li # Flow: 247*2abb3134SXin Li # 248*2abb3134SXin Li # Long term: We should have assoc-analysis.xml, next to dist-analysis.xml? 249*2abb3134SXin Li # 250*2abb3134SXin Li # Short term: update_rappor.py should print every combination of string vs. 251*2abb3134SXin Li # bool? Or I guess we have it in rappor-vars.csv 252*2abb3134SXin Li 253*2abb3134SXin Li for reports_path, date, metric_name in input_iter: 254*2abb3134SXin Li pairs = var_pairs[metric_name] 255*2abb3134SXin Li for var1, var2 in pairs: 256*2abb3134SXin Li # Assuming var1 is a string. TODO: Use an assoc file, not dist_maps? 257*2abb3134SXin Li field1_name = '%s.%s' % (metric_name, var1) 258*2abb3134SXin Li map1_path = dist_maps.GetMapPath(field1_name) 259*2abb3134SXin Li 260*2abb3134SXin Li # e.g. domain_X_flags__DID_PROCEED 261*2abb3134SXin Li # Don't use .. in filenames since it could be confusing. 262*2abb3134SXin Li pair_name = '%s_X_%s' % (var1, var2.replace('..', '_')) 263*2abb3134SXin Li output_dir = os.path.join(output_base_dir, metric_name, pair_name, date) 264*2abb3134SXin Li 265*2abb3134SXin Li yield metric_name, date, reports_path, var1, var2, map1_path, output_dir 266*2abb3134SXin Li 267*2abb3134SXin Li 268*2abb3134SXin Lidef CreateOptionsParser(): 269*2abb3134SXin Li p = optparse.OptionParser() 270*2abb3134SXin Li 271*2abb3134SXin Li p.add_option( 272*2abb3134SXin Li '--bad-report-out', dest='bad_report', metavar='PATH', type='str', 273*2abb3134SXin Li default='', 274*2abb3134SXin Li help='Optionally write a report of input filenames with invalid field ' 275*2abb3134SXin Li 'IDs to this file.') 276*2abb3134SXin Li p.add_option( 277*2abb3134SXin Li '--config-dir', dest='config_dir', metavar='PATH', type='str', 278*2abb3134SXin Li default='', 279*2abb3134SXin Li help='Directory with metadata schema and params files to read.') 280*2abb3134SXin Li p.add_option( 281*2abb3134SXin Li '--map-dir', dest='map_dir', metavar='PATH', type='str', 282*2abb3134SXin Li default='', 283*2abb3134SXin Li help='Directory with map files to read.') 284*2abb3134SXin Li p.add_option( 285*2abb3134SXin Li '--output-base-dir', dest='output_base_dir', metavar='PATH', type='str', 286*2abb3134SXin Li default='', 287*2abb3134SXin Li help='Root of the directory tree where analysis output will be placed.') 288*2abb3134SXin Li p.add_option( 289*2abb3134SXin Li '--field-ids', dest='field_ids', metavar='PATH', type='str', 290*2abb3134SXin Li default='', 291*2abb3134SXin Li help='Optional CSV file with field IDs (generally should not be used).') 292*2abb3134SXin Li 293*2abb3134SXin Li return p 294*2abb3134SXin Li 295*2abb3134SXin Li 296*2abb3134SXin Lidef main(argv): 297*2abb3134SXin Li (opts, argv) = CreateOptionsParser().parse_args(argv) 298*2abb3134SXin Li 299*2abb3134SXin Li if opts.bad_report: 300*2abb3134SXin Li bad_f = open(opts.bad_report, 'w') 301*2abb3134SXin Li bad_c = csv.writer(bad_f) 302*2abb3134SXin Li else: 303*2abb3134SXin Li bad_c = None 304*2abb3134SXin Li 305*2abb3134SXin Li action = argv[1] 306*2abb3134SXin Li 307*2abb3134SXin Li if not opts.config_dir: 308*2abb3134SXin Li raise RuntimeError('--config-dir is required') 309*2abb3134SXin Li if not opts.map_dir: 310*2abb3134SXin Li raise RuntimeError('--map-dir is required') 311*2abb3134SXin Li if not opts.output_base_dir: 312*2abb3134SXin Li raise RuntimeError('--output-base-dir is required') 313*2abb3134SXin Li 314*2abb3134SXin Li # This is shared between the two specs. 315*2abb3134SXin Li path = os.path.join(opts.config_dir, 'dist-analysis.csv') 316*2abb3134SXin Li with open(path) as f: 317*2abb3134SXin Li dist_maps = DistMapLookup(f, opts.map_dir) 318*2abb3134SXin Li 319*2abb3134SXin Li path = os.path.join(opts.config_dir, 'rappor-vars.csv') 320*2abb3134SXin Li with open(path) as f: 321*2abb3134SXin Li var_schema = VarSchema(f, opts.config_dir) 322*2abb3134SXin Li 323*2abb3134SXin Li if action == 'dist': 324*2abb3134SXin Li if opts.field_ids: 325*2abb3134SXin Li with open(opts.field_ids) as f: 326*2abb3134SXin Li field_id_lookup = CreateFieldIdLookup(f) 327*2abb3134SXin Li else: 328*2abb3134SXin Li field_id_lookup = {} 329*2abb3134SXin Li 330*2abb3134SXin Li input_iter = DistInputIter(sys.stdin) 331*2abb3134SXin Li for row in DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, 332*2abb3134SXin Li bad_c): 333*2abb3134SXin Li # The spec is a series of space-separated tokens. 334*2abb3134SXin Li tokens = row + (opts.output_base_dir,) 335*2abb3134SXin Li print ' '.join(str(t) for t in tokens) 336*2abb3134SXin Li 337*2abb3134SXin Li elif action == 'assoc': 338*2abb3134SXin Li # Parse input 339*2abb3134SXin Li input_iter = AssocInputIter(sys.stdin) 340*2abb3134SXin Li 341*2abb3134SXin Li # Create M x N association tasks 342*2abb3134SXin Li var_pairs = CreateAssocVarPairs(var_schema.GetAssocMetrics()) 343*2abb3134SXin Li 344*2abb3134SXin Li # Now add the other constant stuff 345*2abb3134SXin Li for row in AssocTaskSpec( 346*2abb3134SXin Li input_iter, var_pairs, dist_maps, opts.output_base_dir, bad_c): 347*2abb3134SXin Li 348*2abb3134SXin Li num_reports = 0 # placeholder, not filtering yet 349*2abb3134SXin Li tokens = (num_reports,) + row 350*2abb3134SXin Li print ' '.join(str(t) for t in tokens) 351*2abb3134SXin Li 352*2abb3134SXin Li else: 353*2abb3134SXin Li raise RuntimeError('Invalid action %r' % action) 354*2abb3134SXin Li 355*2abb3134SXin Li 356*2abb3134SXin Liif __name__ == '__main__': 357*2abb3134SXin Li try: 358*2abb3134SXin Li main(sys.argv) 359*2abb3134SXin Li except IOError, e: 360*2abb3134SXin Li if e.errno != errno.EPIPE: # ignore broken pipe 361*2abb3134SXin Li raise 362*2abb3134SXin Li except RuntimeError, e: 363*2abb3134SXin Li print >>sys.stderr, 'FATAL: %s' % e 364*2abb3134SXin Li sys.exit(1) 365