1# Lint as: python2, python3 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import logging, os, shutil, tempfile 7 8import common 9 10from autotest_lib.client.bin import utils 11from autotest_lib.client.common_lib import autotemp, error 12from autotest_lib.client.cros import constants 13from autotest_lib.client.cros import cros_ui 14from autotest_lib.client.cros import cryptohome 15 16 17PK12UTIL = 'pk12util' 18CERTUTIL = 'certutil' 19OPENSSLP12 = 'openssl pkcs12' 20OPENSSLX509 = 'openssl x509' 21OPENSSLRSA = 'openssl rsa' 22OPENSSLREQ = 'openssl req' 23OPENSSLCRYPTO = 'openssl sha1' 24 25TESTUSER = '[email protected]' 26TESTPASS = 'testme' 27 28 29class OwnershipError(error.TestError): 30 """Generic error for ownership-related failures.""" 31 pass 32 33 34class scoped_tempfile(object): 35 """A wrapper that provides scoped semantics for temporary files. 36 37 Providing a file path causes the scoped_tempfile to take ownership of the 38 file at the provided path. The file at the path will be deleted when this 39 object goes out of scope. If no path is provided, then a temporary file 40 object will be created for the lifetime of the scoped_tempfile 41 42 autotemp.tempfile objects don't seem to play nicely with being 43 used in system commands, so they can't be used for my purposes. 44 """ 45 46 tempdir = autotemp.tempdir(unique_id='ownership') 47 48 def __init__(self, name=None): 49 self.name = name 50 if not self.name: 51 self.fo = tempfile.TemporaryFile() 52 53 54 def __del__(self): 55 if self.name: 56 if os.path.exists(self.name): 57 os.unlink(self.name) 58 else: 59 self.fo.close() # Will destroy the underlying tempfile 60 61 62def system_output_on_fail(cmd): 63 """Run a |cmd|, capturing output and logging it only on error. 64 65 @param cmd: the command to run. 66 """ 67 output = None 68 try: 69 output = utils.system_output(cmd) 70 except: 71 logging.error(output) 72 raise 73 74 75def __unlink(filename): 76 """unlink a file, but log OSError and IOError instead of raising. 77 78 This allows unlinking files that don't exist safely. 79 80 @param filename: the file to attempt to unlink. 81 """ 82 try: 83 os.unlink(filename) 84 except (IOError, OSError) as error: 85 logging.info(error) 86 87 88def restart_ui_to_clear_ownership_files(): 89 """Remove on-disk state related to device ownership. 90 91 The UI must be stopped while we do this, or the session_manager will 92 write the policy and key files out again. 93 """ 94 cros_ui.stop(allow_fail=not cros_ui.is_up()) 95 clear_ownership_files_no_restart() 96 cros_ui.start() 97 98 99def clear_ownership_files_no_restart(): 100 """Remove on-disk state related to device ownership. 101 102 The UI must be stopped while we do this, or the session_manager will 103 write the policy and key files out again. 104 """ 105 if cros_ui.is_up(): 106 raise error.TestError("Tried to clear ownership with UI running.") 107 __unlink(constants.OWNER_KEY_FILE) 108 __unlink(constants.SIGNED_POLICY_FILE) 109 __unlink(os.path.join(constants.USER_DATA_DIR, 'Local State')) 110 111 112def fake_ownership(): 113 """Fake ownership by generating the necessary magic files.""" 114 # Determine the module directory. 115 dirname = os.path.dirname(__file__) 116 mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT) 117 mock_signedpolicyfile = os.path.join(dirname, 118 constants.MOCK_OWNER_POLICY) 119 utils.open_write_close(constants.OWNER_KEY_FILE, 120 cert_extract_pubkey_der(mock_certfile)) 121 shutil.copy(mock_signedpolicyfile, 122 constants.SIGNED_POLICY_FILE) 123 124 125POLICY_TYPE = 'google/chromeos/device' 126 127 128def assert_has_policy_data(response_proto): 129 """Assert that given protobuf has a policy_data field. 130 131 @param response_proto: a PolicyFetchResponse protobuf. 132 @raises OwnershipError on failure. 133 """ 134 if not response_proto.HasField("policy_data"): 135 raise OwnershipError('Malformatted response.') 136 137 138def assert_has_device_settings(data_proto): 139 """Assert that given protobuf is a policy with device settings in it. 140 141 @param data_proto: a PolicyData protobuf. 142 @raises OwnershipError if this isn't CrOS policy, or has no settings inside. 143 """ 144 if (not data_proto.HasField("policy_type") or 145 data_proto.policy_type != POLICY_TYPE or 146 not data_proto.HasField("policy_value")): 147 raise OwnershipError('Malformatted response.') 148 149 150def assert_username(data_proto, username): 151 """Assert that given protobuf is a policy associated with the given user. 152 153 @param data_proto: a PolicyData protobuf. 154 @param username: the username to check for 155 @raises OwnershipError if data_proto isn't associated with username 156 """ 157 if data_proto.username != username: 158 raise OwnershipError('Incorrect username.') 159 160 161def assert_guest_setting(settings, guests): 162 """Assert that given protobuf has given guest-related settings. 163 164 @param settings: a ChromeDeviceSettingsProto protobuf. 165 @param guests: boolean indicating whether guests are allowed to sign in. 166 @raises OwnershipError if settings doesn't enforce the provided setting. 167 """ 168 if not settings.HasField("guest_mode_enabled"): 169 raise OwnershipError('No guest mode setting protobuf.') 170 if not settings.guest_mode_enabled.HasField("guest_mode_enabled"): 171 raise OwnershipError('No guest mode setting.') 172 if settings.guest_mode_enabled.guest_mode_enabled != guests: 173 raise OwnershipError('Incorrect guest mode setting.') 174 175 176def assert_show_users(settings, show_users): 177 """Assert that given protobuf has given user-avatar-showing settings. 178 179 @param settings: a ChromeDeviceSettingsProto protobuf. 180 @param show_users: boolean indicating whether avatars are shown on sign in. 181 @raises OwnershipError if settings doesn't enforce the provided setting. 182 """ 183 if not settings.HasField("show_user_names"): 184 raise OwnershipError('No show users setting protobuf.') 185 if not settings.show_user_names.HasField("show_user_names"): 186 raise OwnershipError('No show users setting.') 187 if settings.show_user_names.show_user_names != show_users: 188 raise OwnershipError('Incorrect show users setting.') 189 190 191def assert_roaming(settings, roaming): 192 """Assert that given protobuf has given roaming settings. 193 194 @param settings: a ChromeDeviceSettingsProto protobuf. 195 @param roaming: boolean indicating whether roaming is allowed. 196 @raises OwnershipError if settings doesn't enforce the provided setting. 197 """ 198 if not settings.HasField("data_roaming_enabled"): 199 raise OwnershipError('No roaming setting protobuf.') 200 if not settings.data_roaming_enabled.HasField("data_roaming_enabled"): 201 raise OwnershipError('No roaming setting.') 202 if settings.data_roaming_enabled.data_roaming_enabled != roaming: 203 raise OwnershipError('Incorrect roaming setting.') 204 205 206def assert_new_users(settings, new_users): 207 """Assert that given protobuf has given new user settings. 208 209 @param settings: a ChromeDeviceSettingsProto protobuf. 210 @param new_users: boolean indicating whether adding users is allowed. 211 @raises OwnershipError if settings doesn't enforce the provided setting. 212 """ 213 if not settings.HasField("allow_new_users"): 214 raise OwnershipError('No allow new users setting protobuf.') 215 if not settings.allow_new_users.HasField("allow_new_users"): 216 raise OwnershipError('No allow new users setting.') 217 if settings.allow_new_users.allow_new_users != new_users: 218 raise OwnershipError('Incorrect allow new users setting.') 219 220 221def __user_nssdb(user): 222 """Returns the path to the NSSDB for the provided user. 223 224 @param user: the user whose NSSDB the caller wants. 225 @return: absolute path to user's NSSDB. 226 """ 227 return os.path.join(cryptohome.user_path(user), '.pki', 'nssdb') 228 229 230def use_known_ownerkeys(user): 231 """Sets the system up to use a well-known keypair for owner operations. 232 233 Assuming the appropriate cryptohome is already mounted, configures the 234 device to accept policies signed with the checked-in 'mock' owner key. 235 236 @param user: the user whose NSSDB should be populated with key material. 237 """ 238 dirname = os.path.dirname(__file__) 239 mock_keyfile = os.path.join(dirname, constants.MOCK_OWNER_KEY) 240 mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT) 241 push_to_nss(mock_keyfile, mock_certfile, __user_nssdb(user)) 242 utils.open_write_close(constants.OWNER_KEY_FILE, 243 cert_extract_pubkey_der(mock_certfile)) 244 245 246def known_privkey(): 247 """Returns the mock owner private key in PEM format. 248 249 @return: mock owner private key in PEM format. 250 """ 251 dirname = os.path.dirname(__file__) 252 return utils.read_file(os.path.join(dirname, constants.MOCK_OWNER_KEY)) 253 254 255def known_pubkey(): 256 """Returns the mock owner public key in DER format. 257 258 @return: mock owner public key in DER format. 259 """ 260 dirname = os.path.dirname(__file__) 261 return cert_extract_pubkey_der(os.path.join(dirname, 262 constants.MOCK_OWNER_CERT)) 263 264 265def pairgen(): 266 """Generate a self-signed cert and associated private key. 267 268 Generates a self-signed X509 certificate and the associated private key. 269 The key is 2048 bits. The generated material is stored in PEM format 270 and the paths to the two files are returned. 271 272 The caller is responsible for cleaning up these files. 273 274 @return: (/path/to/private_key, /path/to/self-signed_cert) 275 """ 276 keyfile = scoped_tempfile.tempdir.name + '/private.key' 277 certfile = scoped_tempfile.tempdir.name + '/cert.pem' 278 cmd = '%s -x509 -subj %s -newkey rsa:2048 -nodes -keyout %s -out %s' % ( 279 OPENSSLREQ, '/CN=me', keyfile, certfile) 280 system_output_on_fail(cmd) 281 return (keyfile, certfile) 282 283 284def pairgen_as_data(): 285 """Generates keypair, returns keys as data. 286 287 Generates a fresh owner keypair and then passes back the 288 PEM-encoded private key and the DER-encoded public key. 289 290 @return: (PEM-encoded private key, DER-encoded public key) 291 """ 292 (keypath, certpath) = pairgen() 293 keyfile = scoped_tempfile(keypath) 294 certfile = scoped_tempfile(certpath) 295 return (utils.read_file(keyfile.name), 296 cert_extract_pubkey_der(certfile.name)) 297 298 299def push_to_nss(keyfile, certfile, nssdb): 300 """Takes a pre-generated key pair and pushes them to an NSS DB. 301 302 Given paths to a private key and cert in PEM format, stores the pair 303 in the provided nssdb. 304 305 @param keyfile: path to PEM-formatted private key file. 306 @param certfile: path to PEM-formatted cert file for associated public key. 307 @param nssdb: path to NSSDB to be populated with the provided keys. 308 """ 309 for_push = scoped_tempfile(scoped_tempfile.tempdir.name + '/for_push.p12') 310 cmd = '%s -export -in %s -inkey %s -out %s ' % ( 311 OPENSSLP12, certfile, keyfile, for_push.name) 312 cmd += '-passin pass: -passout pass:' 313 system_output_on_fail(cmd) 314 cmd = '%s -d "sql:%s" -i %s -W ""' % (PK12UTIL, 315 nssdb, 316 for_push.name) 317 system_output_on_fail(cmd) 318 319 320def cert_extract_pubkey_der(pem): 321 """Given a PEM-formatted cert, extracts the public key in DER format. 322 323 Pass in an X509 certificate in PEM format, and you'll get back the 324 DER-formatted public key as a string. 325 326 @param pem: path to a PEM-formatted cert file. 327 @return: DER-encoded public key from cert, as a string. 328 """ 329 outfile = scoped_tempfile(scoped_tempfile.tempdir.name + '/pubkey.der') 330 cmd = '%s -in %s -pubkey -noout ' % (OPENSSLX509, pem) 331 cmd += '| %s -outform DER -pubin -out %s' % (OPENSSLRSA, 332 outfile.name) 333 system_output_on_fail(cmd) 334 der = utils.read_file(outfile.name) 335 return der 336 337 338def sign(pem_key, data): 339 """Signs |data| with key from |pem_key|, returns signature. 340 341 Using the PEM-formatted private key in |pem_key|, generates an 342 RSA-with-SHA1 signature over |data| and returns the signature in 343 a string. 344 345 @param pem_key: PEM-formatted private key, as a string. 346 @param data: data to be signed. 347 @return: signature as a string. 348 """ 349 sig = scoped_tempfile() 350 err = scoped_tempfile() 351 data_file = scoped_tempfile() 352 data_file.fo.write(data) 353 data_file.fo.seek(0) 354 355 pem_key_file = scoped_tempfile(scoped_tempfile.tempdir.name + '/pkey.pem') 356 utils.open_write_close(pem_key_file.name, pem_key) 357 358 cmd = '%s -sign %s' % (OPENSSLCRYPTO, pem_key_file.name) 359 try: 360 utils.run(cmd, 361 stdin=data_file.fo, 362 stdout_tee=sig.fo, 363 stderr_tee=err.fo) 364 except: 365 err.fo.seek(0) 366 logging.error(err.fo.read()) 367 raise 368 369 sig.fo.seek(0) 370 sig_data = sig.fo.read() 371 if not sig_data: 372 raise error.OwnershipError('Empty signature!') 373 return sig_data 374 375 376def get_user_policy_key_filename(username): 377 """Returns the path to the user policy key for the given username. 378 379 @param username: the user whose policy key we want the path to. 380 @return: absolute path to user's policy key file. 381 """ 382 return os.path.join(constants.USER_POLICY_DIR, 383 cryptohome.get_user_hash(username), 384 constants.USER_POLICY_KEY_FILENAME) 385