1#!/usr/bin/python 2"""Read a list of 'counts' paths on stdin, and write a task spec on stdout. 3 4Each line represents a task, or R process invocation. The params on each line 5are passed to ./dist.sh decode-many or ./assoc.sh decode-many. 6""" 7 8import collections 9import csv 10import errno 11import optparse 12import os 13import pprint 14import re 15import sys 16 17import util 18 19 20def _ReadDistMaps(f): 21 dist_maps = {} 22 c = csv.reader(f) 23 for i, row in enumerate(c): 24 if i == 0: 25 expected = ['var', 'map_filename'] 26 if row != expected: 27 raise RuntimeError('Expected CSV header %s' % expected) 28 continue # skip header 29 30 var_name, map_filename = row 31 dist_maps[var_name] = map_filename 32 return dist_maps 33 34 35class DistMapLookup(object): 36 """Create a dictionary of var -> map to analyze against. 37 38 TODO: Support a LIST of maps. Users should be able to specify more than one. 39 """ 40 def __init__(self, f, map_dir): 41 self.dist_maps = _ReadDistMaps(f) 42 self.map_dir = map_dir 43 44 def GetMapPath(self, var_name): 45 filename = self.dist_maps[var_name] 46 return os.path.join(self.map_dir, filename) 47 48 49def CreateFieldIdLookup(f): 50 """Create a dictionary that specifies single variable analysis each var. 51 52 Args: 53 config_dir: directory of metadata, output by update_rappor.par 54 55 Returns: 56 A dictionary from field ID -> full field name 57 58 NOTE: Right now we're only doing single variable analysis for strings, so we 59 don't have the "type". 60 """ 61 field_id_lookup = {} 62 c = csv.reader(f) 63 for i, row in enumerate(c): 64 if i == 0: 65 expected = ['metric', 'field', 'field_type', 'params', 'field_id'] 66 if row != expected: 67 raise RuntimeError('Expected CSV header %s' % expected) 68 continue 69 70 metric, field, field_type, _, field_id = row 71 72 if field_type != 'string': 73 continue 74 75 # Paper over the difference between plain metrics (single variable) and 76 # metrics with fields (multiple variables, for association analysis). 77 if field: 78 full_field_name = '%s.%s' % (metric, field) 79 else: 80 full_field_name = metric 81 82 field_id_lookup[field_id] = full_field_name 83 return field_id_lookup 84 85 86def _ReadVarSchema(f): 87 """Given the rappor-vars.csv file, return a list of metric/var/type.""" 88 # metric -> list of (variable name, type) 89 assoc_metrics = collections.defaultdict(list) 90 params_lookup = {} 91 92 c = csv.reader(f) 93 for i, row in enumerate(c): 94 if i == 0: 95 expected = ['metric', 'var', 'var_type', 'params'] 96 if row != expected: 97 raise RuntimeError('Expected CSV header %s, got %s' % (expected, row)) 98 continue 99 100 metric, var, var_type, params = row 101 if var == '': 102 full_var_name = metric 103 else: 104 full_var_name = '%s.%s' % (metric, var) 105 # Also group multi-dimensional reports 106 assoc_metrics[metric].append((var, var_type)) 107 108 params_lookup[full_var_name] = params 109 110 return assoc_metrics, params_lookup 111 112 113class VarSchema(object): 114 """Object representing rappor-vars.csv. 115 116 Right now we use it for slightly different purposes for dist and assoc 117 analysis. 118 """ 119 def __init__(self, f, params_dir): 120 self.assoc_metrics, self.params_lookup = _ReadVarSchema(f) 121 self.params_dir = params_dir 122 123 def GetParamsPath(self, var_name): 124 filename = self.params_lookup[var_name] 125 return os.path.join(self.params_dir, filename + '.csv') 126 127 def GetAssocMetrics(self): 128 return self.assoc_metrics 129 130 131def CountReports(f): 132 num_reports = 0 133 for line in f: 134 first_col = line.split(',')[0] 135 num_reports += int(first_col) 136 return num_reports 137 138 139DIST_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_counts.csv') 140 141 142def DistInputIter(stdin): 143 """Read lines from stdin and extract fields to construct analysis tasks.""" 144 for line in stdin: 145 m = DIST_INPUT_PATH_RE.match(line) 146 if not m: 147 raise RuntimeError('Invalid path %r' % line) 148 149 counts_path = line.strip() 150 date, field_id = m.groups() 151 152 yield counts_path, date, field_id 153 154 155def DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, bad_c): 156 """Print task spec for single variable RAPPOR to stdout.""" 157 158 num_bad = 0 159 unique_ids = set() 160 161 for counts_path, date, field_id in input_iter: 162 unique_ids.add(field_id) 163 164 # num_reports is used for filtering 165 with open(counts_path) as f: 166 num_reports = CountReports(f) 167 168 # Look up field name from field ID 169 if field_id_lookup: 170 field_name = field_id_lookup.get(field_id) 171 if field_name is None: 172 # The metric id is the md5 hash of the name. We can miss some, e.g. due 173 # to debug builds. 174 if bad_c: 175 bad_c.writerow((date, field_id, num_reports)) 176 num_bad += 1 177 continue 178 else: 179 field_name = field_id 180 181 # NOTE: We could remove the params from the spec if decode_dist.R took the 182 # --schema flag. The var type is there too. 183 params_path = var_schema.GetParamsPath(field_name) 184 map_path= dist_maps.GetMapPath(field_name) 185 186 yield num_reports, field_name, date, counts_path, params_path, map_path 187 188 util.log('%d unique field IDs', len(unique_ids)) 189 if num_bad: 190 util.log('Failed field ID -> field name lookup on %d files ' 191 '(check --field-ids file)', num_bad) 192 193 194ASSOC_INPUT_PATH_RE = re.compile(r'.*/(\d+-\d+-\d+)/(\S+)_reports.csv') 195 196 197def AssocInputIter(stdin): 198 """Read lines from stdin and extract fields to construct analysis tasks.""" 199 for line in stdin: 200 m = ASSOC_INPUT_PATH_RE.match(line) 201 if not m: 202 raise RuntimeError('Invalid path %r' % line) 203 204 reports_path = line.strip() 205 date, metric_name = m.groups() 206 207 yield reports_path, date, metric_name 208 209 210def CreateAssocVarPairs(rappor_metrics): 211 """Yield a list of pairs of variables that should be associated. 212 213 For now just do all (string x boolean) analysis. 214 """ 215 var_pairs = collections.defaultdict(list) 216 217 for metric, var_list in rappor_metrics.iteritems(): 218 string_vars = [] 219 boolean_vars = [] 220 221 # Separate variables into strings and booleans 222 for var_name, var_type in var_list: 223 if var_type == 'string': 224 string_vars.append(var_name) 225 elif var_type == 'boolean': 226 boolean_vars.append(var_name) 227 else: 228 util.log('Unknown type variable type %r', var_type) 229 230 for s in string_vars: 231 for b in boolean_vars: 232 var_pairs[metric].append((s, b)) 233 return var_pairs 234 235 236# For debugging 237def PrintAssocVarPairs(var_pairs): 238 for metric, var_list in var_pairs.iteritems(): 239 print metric 240 for var_name, var_type in var_list: 241 print '\t', var_name, var_type 242 243 244def AssocTaskSpec(input_iter, var_pairs, dist_maps, output_base_dir, bad_c): 245 """Print the task spec for multiple variable RAPPOR to stdout.""" 246 # Flow: 247 # 248 # Long term: We should have assoc-analysis.xml, next to dist-analysis.xml? 249 # 250 # Short term: update_rappor.py should print every combination of string vs. 251 # bool? Or I guess we have it in rappor-vars.csv 252 253 for reports_path, date, metric_name in input_iter: 254 pairs = var_pairs[metric_name] 255 for var1, var2 in pairs: 256 # Assuming var1 is a string. TODO: Use an assoc file, not dist_maps? 257 field1_name = '%s.%s' % (metric_name, var1) 258 map1_path = dist_maps.GetMapPath(field1_name) 259 260 # e.g. domain_X_flags__DID_PROCEED 261 # Don't use .. in filenames since it could be confusing. 262 pair_name = '%s_X_%s' % (var1, var2.replace('..', '_')) 263 output_dir = os.path.join(output_base_dir, metric_name, pair_name, date) 264 265 yield metric_name, date, reports_path, var1, var2, map1_path, output_dir 266 267 268def CreateOptionsParser(): 269 p = optparse.OptionParser() 270 271 p.add_option( 272 '--bad-report-out', dest='bad_report', metavar='PATH', type='str', 273 default='', 274 help='Optionally write a report of input filenames with invalid field ' 275 'IDs to this file.') 276 p.add_option( 277 '--config-dir', dest='config_dir', metavar='PATH', type='str', 278 default='', 279 help='Directory with metadata schema and params files to read.') 280 p.add_option( 281 '--map-dir', dest='map_dir', metavar='PATH', type='str', 282 default='', 283 help='Directory with map files to read.') 284 p.add_option( 285 '--output-base-dir', dest='output_base_dir', metavar='PATH', type='str', 286 default='', 287 help='Root of the directory tree where analysis output will be placed.') 288 p.add_option( 289 '--field-ids', dest='field_ids', metavar='PATH', type='str', 290 default='', 291 help='Optional CSV file with field IDs (generally should not be used).') 292 293 return p 294 295 296def main(argv): 297 (opts, argv) = CreateOptionsParser().parse_args(argv) 298 299 if opts.bad_report: 300 bad_f = open(opts.bad_report, 'w') 301 bad_c = csv.writer(bad_f) 302 else: 303 bad_c = None 304 305 action = argv[1] 306 307 if not opts.config_dir: 308 raise RuntimeError('--config-dir is required') 309 if not opts.map_dir: 310 raise RuntimeError('--map-dir is required') 311 if not opts.output_base_dir: 312 raise RuntimeError('--output-base-dir is required') 313 314 # This is shared between the two specs. 315 path = os.path.join(opts.config_dir, 'dist-analysis.csv') 316 with open(path) as f: 317 dist_maps = DistMapLookup(f, opts.map_dir) 318 319 path = os.path.join(opts.config_dir, 'rappor-vars.csv') 320 with open(path) as f: 321 var_schema = VarSchema(f, opts.config_dir) 322 323 if action == 'dist': 324 if opts.field_ids: 325 with open(opts.field_ids) as f: 326 field_id_lookup = CreateFieldIdLookup(f) 327 else: 328 field_id_lookup = {} 329 330 input_iter = DistInputIter(sys.stdin) 331 for row in DistTaskSpec(input_iter, field_id_lookup, var_schema, dist_maps, 332 bad_c): 333 # The spec is a series of space-separated tokens. 334 tokens = row + (opts.output_base_dir,) 335 print ' '.join(str(t) for t in tokens) 336 337 elif action == 'assoc': 338 # Parse input 339 input_iter = AssocInputIter(sys.stdin) 340 341 # Create M x N association tasks 342 var_pairs = CreateAssocVarPairs(var_schema.GetAssocMetrics()) 343 344 # Now add the other constant stuff 345 for row in AssocTaskSpec( 346 input_iter, var_pairs, dist_maps, opts.output_base_dir, bad_c): 347 348 num_reports = 0 # placeholder, not filtering yet 349 tokens = (num_reports,) + row 350 print ' '.join(str(t) for t in tokens) 351 352 else: 353 raise RuntimeError('Invalid action %r' % action) 354 355 356if __name__ == '__main__': 357 try: 358 main(sys.argv) 359 except IOError, e: 360 if e.errno != errno.EPIPE: # ignore broken pipe 361 raise 362 except RuntimeError, e: 363 print >>sys.stderr, 'FATAL: %s' % e 364 sys.exit(1) 365