1# Copyright 2014 Google Inc. All rights reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15library(parallel) # mclapply 16 17source.rappor <- function(rel_path) { 18 abs_path <- paste0(Sys.getenv("RAPPOR_REPO", ""), rel_path) 19 source(abs_path) 20} 21 22source.rappor("analysis/R/util.R") # for Log 23source.rappor("analysis/R/decode.R") # for ComputeCounts 24 25# 26# Tools used to estimate variable distributions of up to three variables 27# in RAPPOR. This contains the functions relevant to estimating joint 28# distributions. 29 30GetOtherProbs <- function(counts, map_by_cohort, marginal, params, pstar, 31 qstar) { 32 # Computes the marginal for the "other" category. 33 # 34 # Args: 35 # counts: m x (k+1) matrix with counts of each bit for each 36 # cohort (m=#cohorts total, k=# bits in bloom filter), first column 37 # stores the total counts 38 # map_by_cohort: list of matrices encoding locations of hashes for each 39 # string "other" category) 40 # marginal: object containing the estimated frequencies of known strings 41 # as well as the strings themselves, variance, etc. 42 # params: RAPPOR encoding parameters 43 # 44 # Returns: 45 # List of vectors of probabilities that each bit was set by the "other" 46 # category. The list is indexed by cohort. 47 48 N <- sum(counts[, 1]) 49 50 # Counts of known strings to remove from each cohort. 51 known_counts <- ceiling(marginal$proportion * N / params$m) 52 sum_known <- sum(known_counts) 53 54 # Select only the strings we care about from each cohort. 55 # NOTE: drop = FALSE necessary if there is one candidate 56 candidate_map <- lapply(map_by_cohort, function(map_for_cohort) { 57 map_for_cohort[, marginal$string, drop = FALSE] 58 }) 59 60 # If no strings were found, all nonzero counts were set by "other" 61 if (length(marginal) == 0) { 62 probs_other <- apply(counts, 1, function(cohort_row) { 63 cohort_row[-1] / cohort_row[1] 64 }) 65 return(as.list(as.data.frame(probs_other))) 66 } 67 68 # Counts set by known strings without noise considerations. 69 known_counts_by_cohort <- sapply(candidate_map, function(map_for_cohort) { 70 as.vector(as.matrix(map_for_cohort) %*% known_counts) 71 }) 72 73 # Protect against R's matrix/vector confusion. This ensures 74 # known_counts_by_cohort is a matrix in the k=1 case. 75 dim(known_counts_by_cohort) <- c(params$m, params$k) 76 77 # Counts set by known vals zero bits adjusting by p plus true bits 78 # adjusting by q. 79 known_counts_by_cohort <- (sum_known - known_counts_by_cohort) * pstar + 80 known_counts_by_cohort * qstar 81 82 # Add the left hand sums to make it a m x (k+1) "counts" matrix 83 known_counts_by_cohort <- cbind(sum_known, known_counts_by_cohort) 84 85 # Counts set by the "other" category. 86 reduced_counts <- counts - known_counts_by_cohort 87 reduced_counts[reduced_counts < 0] <- 0 88 probs_other <- apply(reduced_counts, 1, function(cohort_row) { 89 cohort_row[-1] / cohort_row[1] 90 }) 91 92 # Protect against R's matrix/vector confusion. 93 dim(probs_other) <- c(params$k, params$m) 94 95 probs_other[probs_other > 1] <- 1 96 probs_other[is.nan(probs_other)] <- 0 97 probs_other[is.infinite(probs_other)] <- 0 98 99 # Convert it from a k x m matrix to a list indexed by m cohorts. 100 # as.data.frame makes each cohort a column, which can be indexed by 101 # probs_other[[cohort]]. 102 result <- as.list(as.data.frame(probs_other)) 103 104 result 105} 106 107GetCondProbBooleanReports <- function(reports, pstar, qstar, num_cores) { 108 # Compute conditional probabilities given a set of Boolean reports. 109 # 110 # Args: 111 # reports: RAPPOR reports as a list of bit arrays (of length 1, because 112 # this is a boolean report) 113 # pstar, qstar: standard params computed from from rappor parameters 114 # num_cores: number of cores to pass to mclapply to parallelize apply 115 # 116 # Returns: 117 # Conditional probability of all boolean reports corresponding to 118 # candidates (TRUE, FALSE) 119 120 # The values below are p(report=1|value=TRUE), p(report=1|value=FALSE) 121 cond_probs_for_1 <- c(qstar, pstar) 122 # The values below are p(report=0|value=TRUE), p(report=0|value=FALSE) 123 cond_probs_for_0 <- c(1 - qstar, 1 - pstar) 124 125 cond_report_dist <- mclapply(reports, function(report) { 126 if (report[[1]] == 1) { 127 cond_probs_for_1 128 } else { 129 cond_probs_for_0 130 } 131 }, mc.cores = num_cores) 132 cond_report_dist 133} 134 135GetCondProbStringReports <- function(reports, cohorts, map, m, pstar, qstar, 136 marginal, prob_other = NULL, num_cores) { 137 # Wrapper around GetCondProb. Given a set of reports, cohorts, map and 138 # parameters m, p*, and q*, it first computes bit indices by cohort, and 139 # then applies GetCondProb individually to each report. 140 # 141 # Args: 142 # reports: RAPPOR reports as a list of bit arrays 143 # cohorts: cohorts corresponding to these reports as a list 144 # map: map file 145 # m, pstar, qstar: standard params computed from from rappor parameters 146 # marginal: list containing marginal estimates (output of Decode) 147 # prob_other: vector of length k, indicating how often each bit in the 148 # Bloom filter was set by a string in the "other" category. 149 # 150 # Returns: 151 # Conditional probability of all reports given each of the strings in 152 # marginal$string 153 154 # Get bit indices that are set per candidate per cohort 155 bit_indices_by_cohort <- lapply(1:m, function(cohort) { 156 map_for_cohort <- map$map_by_cohort[[cohort]] 157 # Find the bits set by the candidate strings 158 bit_indices <- lapply(marginal$string, function(x) { 159 which(map_for_cohort[, x]) 160 }) 161 bit_indices 162 }) 163 164 # Apply GetCondProb over all reports 165 cond_report_dist <- mclapply(seq(length(reports)), function(i) { 166 cohort <- cohorts[i] 167 #Log('Report %d, cohort %d', i, cohort) 168 bit_indices <- bit_indices_by_cohort[[cohort]] 169 GetCondProb(reports[[i]], pstar, qstar, bit_indices, 170 prob_other = prob_other[[cohort]]) 171 }, mc.cores = num_cores) 172 cond_report_dist 173} 174 175 176GetCondProb <- function(report, pstar, qstar, bit_indices, prob_other = NULL) { 177 # Given the observed bit array, estimate P(report | true value). 178 # Probabilities are estimated for all truth values. 179 # 180 # Args: 181 # report: A single observed RAPPOR report (binary vector of length k). 182 # params: RAPPOR parameters. 183 # bit_indices: list with one entry for each candidate. Each entry is an 184 # integer vector of length h, specifying which bits are set for the 185 # candidate in the report's cohort. 186 # prob_other: vector of length k, indicating how often each bit in the 187 # Bloom filter was set by a string in the "other" category. 188 # 189 # Returns: 190 # Conditional probability of report given each of the strings in 191 # candidate_strings 192 ones <- sum(report) 193 zeros <- length(report) - ones 194 probs <- ifelse(report == 1, pstar, 1 - pstar) 195 196 # Find the likelihood of report given each candidate string 197 prob_obs_vals <- sapply(bit_indices, function(x) { 198 prod(c(probs[-x], ifelse(report[x] == 1, qstar, 1 - qstar))) 199 }) 200 201 # Account for the "other" category 202 if (!is.null(prob_other)) { 203 prob_other <- prod(c(prob_other[which(report == 1)], 204 (1 - prob_other)[which(report == 0)])) 205 c(prob_obs_vals, prob_other) 206 } else { 207 prob_obs_vals 208 } 209} 210 211UpdatePij <- function(pij, cond_prob) { 212 # Update the probability matrix based on the EM algorithm. 213 # 214 # Args: 215 # pij: conditional distribution of x (vector) 216 # cond_prob: conditional distribution computed previously 217 # 218 # Returns: 219 # Updated pijs from em algorithm (maximization) 220 221 # NOTE: Not using mclapply here because we have a faster C++ implementation. 222 # mclapply spawns multiple processes, and each process can take up 3 GB+ or 5 223 # GB+ of memory. 224 wcp <- lapply(cond_prob, function(x) { 225 z <- x * pij 226 z <- z / sum(z) 227 z[is.nan(z)] <- 0 228 z 229 }) 230 Reduce("+", wcp) / length(wcp) 231} 232 233ComputeVar <- function(cond_prob, est) { 234 # Computes the variance of the estimated pij's. 235 # 236 # Args: 237 # cond_prob: conditional distribution computed previously 238 # est: estimated pij's 239 # 240 # Returns: 241 # Variance of the estimated pij's 242 243 inform <- Reduce("+", lapply(cond_prob, function(x) { 244 (outer(as.vector(x), as.vector(x))) / (sum(x * est))^2 245 })) 246 var_cov <- solve(inform) 247 sd <- matrix(sqrt(diag(var_cov)), dim(cond_prob[[1]])) 248 list(var_cov = var_cov, sd = sd, inform = inform) 249} 250 251EM <- function(cond_prob, starting_pij = NULL, estimate_var = FALSE, 252 max_em_iters = 1000, epsilon = 10^-6, verbose = FALSE) { 253 # Performs estimation. 254 # 255 # Args: 256 # cond_prob: conditional distribution computed previously 257 # starting_pij: estimated pij's 258 # estimate_var: flags whether we should estimate the variance 259 # of our computed distribution 260 # max_em_iters: maximum number of EM iterations 261 # epsilon: convergence parameter 262 # verbose: flags whether to display error data 263 # 264 # Returns: 265 # Estimated pij's, variance, error params 266 267 pij <- list() 268 state_space <- dim(cond_prob[[1]]) 269 if (is.null(starting_pij)) { 270 pij[[1]] <- array(1 / prod(state_space), state_space) 271 } else { 272 pij[[1]] <- starting_pij 273 } 274 275 i <- 0 # visible outside loop 276 if (nrow(pij[[1]]) > 0) { 277 # Run EM 278 for (i in 1:max_em_iters) { 279 pij[[i + 1]] <- UpdatePij(pij[[i]], cond_prob) 280 dif <- max(abs(pij[[i + 1]] - pij[[i]])) 281 if (dif < epsilon) { 282 break 283 } 284 Log('EM iteration %d, dif = %e', i, dif) 285 } 286 } 287 # Compute the variance of the estimate. 288 est <- pij[[length(pij)]] 289 if (estimate_var) { 290 var_cov <- ComputeVar(cond_prob, est) 291 sd <- var_cov$sd 292 inform <- var_cov$inform 293 var_cov <- var_cov$var_cov 294 } else { 295 var_cov <- NULL 296 inform <- NULL 297 sd <- NULL 298 } 299 list(est = est, sd = sd, var_cov = var_cov, hist = pij, num_em_iters = i) 300} 301 302TestIndependence <- function(est, inform) { 303 # Tests the degree of independence between variables. 304 # 305 # Args: 306 # est: esimated pij values 307 # inform: information matrix 308 # 309 # Returns: 310 # Chi-squared statistic for whether two variables are independent 311 312 expec <- outer(apply(est, 1, sum), apply(est, 2, sum)) 313 diffs <- matrix(est - expec, ncol = 1) 314 stat <- t(diffs) %*% inform %*% diffs 315 df <- (nrow(est) - 1) * (ncol(est) - 1) 316 list(stat = stat, pval = pchisq(stat, df, lower = FALSE)) 317} 318 319UpdateJointConditional <- function(cond_report_dist, joint_conditional = NULL) { 320 # Updates the joint conditional distribution of d variables, where 321 # num_variables is chosen by the client. Since variables are conditionally 322 # independent of one another, this is basically an outer product. 323 # 324 # Args: 325 # joint_conditional: The current state of the joint conditional 326 # distribution. This is a list with as many elements as there 327 # are reports. 328 # cond_report_dist: The conditional distribution of variable x, which will 329 # be outer-producted with the current joint conditional. 330 # 331 # Returns: 332 # A list of same length as joint_conditional containing the joint 333 # conditional distribution of all variables. If I want 334 # P(X'=x',Y=y'|X=x,Y=y), I will look at 335 # joint_conditional[x,x',y,y']. 336 337 if (is.null(joint_conditional)) { 338 lapply(cond_report_dist, function(x) array(x)) 339 } else { 340 mapply("outer", joint_conditional, cond_report_dist, 341 SIMPLIFY = FALSE) 342 } 343} 344 345ComputeDistributionEM <- function(reports, report_cohorts, maps, 346 ignore_other = FALSE, 347 params = NULL, 348 params_list = NULL, 349 marginals = NULL, 350 estimate_var = FALSE, 351 num_cores = 10, 352 em_iter_func = EM, 353 max_em_iters = 1000) { 354 # Computes the distribution of num_variables variables, where 355 # num_variables is chosen by the client, using the EM algorithm. 356 # 357 # Args: 358 # reports: A list of num_variables elements, each a 2-dimensional array 359 # containing the counts of each bin for each report 360 # report_cohorts: A num_variables-element list; the ith element is an array 361 # containing the cohort of jth report for ith variable. 362 # maps: A num_variables-element list containing the map for each variable 363 # ignore_other: A boolean describing whether to compute the "other" category 364 # params: RAPPOR encoding parameters. If set, all variables are assumed to 365 # be encoded with these parameters. 366 # params_list: A list of num_variables elements, each of which is the 367 # RAPPOR encoding parameters for a variable (a list itself). If set, 368 # it must be the same length as 'reports'. 369 # marginals: List of estimated marginals for each variable 370 # estimate_var: A flag telling whether to estimate the variance. 371 # em_iter_func: Function that implements the iterative EM algorithm. 372 373 # Handle the case that the client wants to find the joint distribution of too 374 # many variables. 375 num_variables <- length(reports) 376 377 if (is.null(params) && is.null(params_list)) { 378 stop("Either params or params_list must be passed") 379 } 380 381 Log('Computing joint conditional') 382 383 # Compute the counts for each variable and then do conditionals. 384 joint_conditional = NULL 385 found_strings <- list() 386 387 for (j in (1:num_variables)) { 388 Log('Processing var %d', j) 389 390 var_report <- reports[[j]] 391 var_cohort <- report_cohorts[[j]] 392 var_map <- maps[[j]] 393 if (!is.null(params)) { 394 var_params <- params 395 } else { 396 var_params <- params_list[[j]] 397 } 398 399 var_counts <- NULL 400 if (is.null(marginals)) { 401 Log('\tSumming bits to gets observed counts') 402 var_counts <- ComputeCounts(var_report, var_cohort, var_params) 403 404 Log('\tDecoding marginal') 405 marginal <- Decode(var_counts, var_map$all_cohorts_map, var_params, 406 quiet = TRUE)$fit 407 Log('\tMarginal for var %d has %d values:', j, nrow(marginal)) 408 print(marginal[, c('estimate', 'proportion')]) # rownames are the string 409 cat('\n') 410 411 if (nrow(marginal) == 0) { 412 Log('ERROR: Nothing decoded for variable %d', j) 413 return (NULL) 414 } 415 } else { 416 marginal <- marginals[[j]] 417 } 418 found_strings[[j]] <- marginal$string 419 420 p <- var_params$p 421 q <- var_params$q 422 f <- var_params$f 423 # pstar and qstar needed to compute other probabilities as well as for 424 # inputs to GetCondProb{Boolean, String}Reports subsequently 425 pstar <- (1 - f / 2) * p + (f / 2) * q 426 qstar <- (1 - f / 2) * q + (f / 2) * p 427 k <- var_params$k 428 429 # Ignore other probability if either ignore_other is set or k == 1 430 # (Boolean RAPPOR) 431 if (ignore_other || (k == 1)) { 432 prob_other <- vector(mode = "list", length = var_params$m) 433 } else { 434 # Compute the probability of the "other" category 435 if (is.null(var_counts)) { 436 var_counts <- ComputeCounts(var_report, var_cohort, var_params) 437 } 438 prob_other <- GetOtherProbs(var_counts, var_map$map_by_cohort, marginal, 439 var_params, pstar, qstar) 440 found_strings[[j]] <- c(found_strings[[j]], "Other") 441 } 442 443 # Get the joint conditional distribution 444 Log('\tGetCondProb for each report (%d cores)', num_cores) 445 446 # TODO(pseudorandom): check RAPPOR type more systematically instead of by 447 # checking if k == 1 448 if (k == 1) { 449 cond_report_dist <- GetCondProbBooleanReports(var_report, pstar, qstar, 450 num_cores) 451 } else { 452 cond_report_dist <- GetCondProbStringReports(var_report, 453 var_cohort, var_map, var_params$m, pstar, qstar, 454 marginal, prob_other, num_cores) 455 } 456 457 Log('\tUpdateJointConditional') 458 459 # Update the joint conditional distribution of all variables 460 joint_conditional <- UpdateJointConditional(cond_report_dist, 461 joint_conditional) 462 } 463 464 N <- length(joint_conditional) 465 dimensions <- dim(joint_conditional[[1]]) 466 # e.g. 2 x 3 467 dimensions_str <- paste(dimensions, collapse = ' x ') 468 total_entries <- prod(c(N, dimensions)) 469 470 Log('Starting EM with N = %d matrices of size %s (%d entries)', 471 N, dimensions_str, total_entries) 472 473 start_time <- proc.time()[['elapsed']] 474 475 # Run expectation maximization to find joint distribution 476 em <- em_iter_func(joint_conditional, max_em_iters=max_em_iters, 477 epsilon = 10 ^ -6, verbose = FALSE, 478 estimate_var = estimate_var) 479 480 em_elapsed_time <- proc.time()[['elapsed']] - start_time 481 482 dimnames(em$est) <- found_strings 483 # Return results in a usable format 484 list(fit = em$est, 485 sd = em$sd, 486 em_elapsed_time = em_elapsed_time, 487 num_em_iters = em$num_em_iters, 488 # This last field is implementation-specific; it can be used for 489 # interactive debugging. 490 em = em) 491} 492