xref: /aosp_15_r20/external/autotest/client/cros/ownership.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import logging, os, shutil, tempfile
7
8import common
9
10from autotest_lib.client.bin import utils
11from autotest_lib.client.common_lib import autotemp, error
12from autotest_lib.client.cros import constants
13from autotest_lib.client.cros import cros_ui
14from autotest_lib.client.cros import cryptohome
15
16
17PK12UTIL = 'pk12util'
18CERTUTIL = 'certutil'
19OPENSSLP12 = 'openssl pkcs12'
20OPENSSLX509 = 'openssl x509'
21OPENSSLRSA = 'openssl rsa'
22OPENSSLREQ = 'openssl req'
23OPENSSLCRYPTO = 'openssl sha1'
24
25TESTUSER = '[email protected]'
26TESTPASS = 'testme'
27
28
29class OwnershipError(error.TestError):
30    """Generic error for ownership-related failures."""
31    pass
32
33
34class scoped_tempfile(object):
35    """A wrapper that provides scoped semantics for temporary files.
36
37    Providing a file path causes the scoped_tempfile to take ownership of the
38    file at the provided path.  The file at the path will be deleted when this
39    object goes out of scope.  If no path is provided, then a temporary file
40    object will be created for the lifetime of the scoped_tempfile
41
42    autotemp.tempfile objects don't seem to play nicely with being
43    used in system commands, so they can't be used for my purposes.
44    """
45
46    tempdir = autotemp.tempdir(unique_id='ownership')
47
48    def __init__(self, name=None):
49        self.name = name
50        if not self.name:
51            self.fo = tempfile.TemporaryFile()
52
53
54    def __del__(self):
55        if self.name:
56            if os.path.exists(self.name):
57                os.unlink(self.name)
58        else:
59            self.fo.close()  # Will destroy the underlying tempfile
60
61
62def system_output_on_fail(cmd):
63    """Run a |cmd|, capturing output and logging it only on error.
64
65    @param cmd: the command to run.
66    """
67    output = None
68    try:
69        output = utils.system_output(cmd)
70    except:
71        logging.error(output)
72        raise
73
74
75def __unlink(filename):
76    """unlink a file, but log OSError and IOError instead of raising.
77
78    This allows unlinking files that don't exist safely.
79
80    @param filename: the file to attempt to unlink.
81    """
82    try:
83        os.unlink(filename)
84    except (IOError, OSError) as error:
85        logging.info(error)
86
87
88def restart_ui_to_clear_ownership_files():
89    """Remove on-disk state related to device ownership.
90
91    The UI must be stopped while we do this, or the session_manager will
92    write the policy and key files out again.
93    """
94    cros_ui.stop(allow_fail=not cros_ui.is_up())
95    clear_ownership_files_no_restart()
96    cros_ui.start()
97
98
99def clear_ownership_files_no_restart():
100    """Remove on-disk state related to device ownership.
101
102    The UI must be stopped while we do this, or the session_manager will
103    write the policy and key files out again.
104    """
105    if cros_ui.is_up():
106        raise error.TestError("Tried to clear ownership with UI running.")
107    __unlink(constants.OWNER_KEY_FILE)
108    __unlink(constants.SIGNED_POLICY_FILE)
109    __unlink(os.path.join(constants.USER_DATA_DIR, 'Local State'))
110
111
112def fake_ownership():
113    """Fake ownership by generating the necessary magic files."""
114    # Determine the module directory.
115    dirname = os.path.dirname(__file__)
116    mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
117    mock_signedpolicyfile = os.path.join(dirname,
118                                         constants.MOCK_OWNER_POLICY)
119    utils.open_write_close(constants.OWNER_KEY_FILE,
120                           cert_extract_pubkey_der(mock_certfile))
121    shutil.copy(mock_signedpolicyfile,
122                constants.SIGNED_POLICY_FILE)
123
124
125POLICY_TYPE = 'google/chromeos/device'
126
127
128def assert_has_policy_data(response_proto):
129    """Assert that given protobuf has a policy_data field.
130
131    @param response_proto: a PolicyFetchResponse protobuf.
132    @raises OwnershipError on failure.
133    """
134    if not response_proto.HasField("policy_data"):
135        raise OwnershipError('Malformatted response.')
136
137
138def assert_has_device_settings(data_proto):
139    """Assert that given protobuf is a policy with device settings in it.
140
141    @param data_proto: a PolicyData protobuf.
142    @raises OwnershipError if this isn't CrOS policy, or has no settings inside.
143    """
144    if (not data_proto.HasField("policy_type") or
145        data_proto.policy_type != POLICY_TYPE or
146        not data_proto.HasField("policy_value")):
147        raise OwnershipError('Malformatted response.')
148
149
150def assert_username(data_proto, username):
151    """Assert that given protobuf is a policy associated with the given user.
152
153    @param data_proto: a PolicyData protobuf.
154    @param username: the username to check for
155    @raises OwnershipError if data_proto isn't associated with username
156    """
157    if data_proto.username != username:
158        raise OwnershipError('Incorrect username.')
159
160
161def assert_guest_setting(settings, guests):
162    """Assert that given protobuf has given guest-related settings.
163
164    @param settings: a ChromeDeviceSettingsProto protobuf.
165    @param guests: boolean indicating whether guests are allowed to sign in.
166    @raises OwnershipError if settings doesn't enforce the provided setting.
167    """
168    if not settings.HasField("guest_mode_enabled"):
169        raise OwnershipError('No guest mode setting protobuf.')
170    if not settings.guest_mode_enabled.HasField("guest_mode_enabled"):
171        raise OwnershipError('No guest mode setting.')
172    if settings.guest_mode_enabled.guest_mode_enabled != guests:
173        raise OwnershipError('Incorrect guest mode setting.')
174
175
176def assert_show_users(settings, show_users):
177    """Assert that given protobuf has given user-avatar-showing settings.
178
179    @param settings: a ChromeDeviceSettingsProto protobuf.
180    @param show_users: boolean indicating whether avatars are shown on sign in.
181    @raises OwnershipError if settings doesn't enforce the provided setting.
182    """
183    if not settings.HasField("show_user_names"):
184        raise OwnershipError('No show users setting protobuf.')
185    if not settings.show_user_names.HasField("show_user_names"):
186        raise OwnershipError('No show users setting.')
187    if settings.show_user_names.show_user_names != show_users:
188        raise OwnershipError('Incorrect show users setting.')
189
190
191def assert_roaming(settings, roaming):
192    """Assert that given protobuf has given roaming settings.
193
194    @param settings: a ChromeDeviceSettingsProto protobuf.
195    @param roaming: boolean indicating whether roaming is allowed.
196    @raises OwnershipError if settings doesn't enforce the provided setting.
197    """
198    if not settings.HasField("data_roaming_enabled"):
199        raise OwnershipError('No roaming setting protobuf.')
200    if not settings.data_roaming_enabled.HasField("data_roaming_enabled"):
201        raise OwnershipError('No roaming setting.')
202    if settings.data_roaming_enabled.data_roaming_enabled != roaming:
203        raise OwnershipError('Incorrect roaming setting.')
204
205
206def assert_new_users(settings, new_users):
207    """Assert that given protobuf has given new user settings.
208
209    @param settings: a ChromeDeviceSettingsProto protobuf.
210    @param new_users: boolean indicating whether adding users is allowed.
211    @raises OwnershipError if settings doesn't enforce the provided setting.
212    """
213    if not settings.HasField("allow_new_users"):
214        raise OwnershipError('No allow new users setting protobuf.')
215    if not settings.allow_new_users.HasField("allow_new_users"):
216        raise OwnershipError('No allow new users setting.')
217    if settings.allow_new_users.allow_new_users != new_users:
218        raise OwnershipError('Incorrect allow new users setting.')
219
220
221def __user_nssdb(user):
222    """Returns the path to the NSSDB for the provided user.
223
224    @param user: the user whose NSSDB the caller wants.
225    @return: absolute path to user's NSSDB.
226    """
227    return os.path.join(cryptohome.user_path(user), '.pki', 'nssdb')
228
229
230def use_known_ownerkeys(user):
231    """Sets the system up to use a well-known keypair for owner operations.
232
233    Assuming the appropriate cryptohome is already mounted, configures the
234    device to accept policies signed with the checked-in 'mock' owner key.
235
236    @param user: the user whose NSSDB should be populated with key material.
237    """
238    dirname = os.path.dirname(__file__)
239    mock_keyfile = os.path.join(dirname, constants.MOCK_OWNER_KEY)
240    mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
241    push_to_nss(mock_keyfile, mock_certfile, __user_nssdb(user))
242    utils.open_write_close(constants.OWNER_KEY_FILE,
243                           cert_extract_pubkey_der(mock_certfile))
244
245
246def known_privkey():
247    """Returns the mock owner private key in PEM format.
248
249    @return: mock owner private key in PEM format.
250    """
251    dirname = os.path.dirname(__file__)
252    return utils.read_file(os.path.join(dirname, constants.MOCK_OWNER_KEY))
253
254
255def known_pubkey():
256    """Returns the mock owner public key in DER format.
257
258    @return: mock owner public key in DER format.
259    """
260    dirname = os.path.dirname(__file__)
261    return cert_extract_pubkey_der(os.path.join(dirname,
262                                                constants.MOCK_OWNER_CERT))
263
264
265def pairgen():
266    """Generate a self-signed cert and associated private key.
267
268    Generates a self-signed X509 certificate and the associated private key.
269    The key is 2048 bits.  The generated material is stored in PEM format
270    and the paths to the two files are returned.
271
272    The caller is responsible for cleaning up these files.
273
274    @return: (/path/to/private_key, /path/to/self-signed_cert)
275    """
276    keyfile = scoped_tempfile.tempdir.name + '/private.key'
277    certfile = scoped_tempfile.tempdir.name + '/cert.pem'
278    cmd = '%s -x509 -subj %s -newkey rsa:2048 -nodes -keyout %s -out %s' % (
279        OPENSSLREQ, '/CN=me', keyfile, certfile)
280    system_output_on_fail(cmd)
281    return (keyfile, certfile)
282
283
284def pairgen_as_data():
285    """Generates keypair, returns keys as data.
286
287    Generates a fresh owner keypair and then passes back the
288    PEM-encoded private key and the DER-encoded public key.
289
290    @return: (PEM-encoded private key, DER-encoded public key)
291    """
292    (keypath, certpath) = pairgen()
293    keyfile = scoped_tempfile(keypath)
294    certfile = scoped_tempfile(certpath)
295    return (utils.read_file(keyfile.name),
296            cert_extract_pubkey_der(certfile.name))
297
298
299def push_to_nss(keyfile, certfile, nssdb):
300    """Takes a pre-generated key pair and pushes them to an NSS DB.
301
302    Given paths to a private key and cert in PEM format, stores the pair
303    in the provided nssdb.
304
305    @param keyfile: path to PEM-formatted private key file.
306    @param certfile: path to PEM-formatted cert file for associated public key.
307    @param nssdb: path to NSSDB to be populated with the provided keys.
308    """
309    for_push = scoped_tempfile(scoped_tempfile.tempdir.name + '/for_push.p12')
310    cmd = '%s -export -in %s -inkey %s -out %s ' % (
311        OPENSSLP12, certfile, keyfile, for_push.name)
312    cmd += '-passin pass: -passout pass:'
313    system_output_on_fail(cmd)
314    cmd = '%s -d "sql:%s" -i %s -W ""' % (PK12UTIL,
315                                          nssdb,
316                                          for_push.name)
317    system_output_on_fail(cmd)
318
319
320def cert_extract_pubkey_der(pem):
321    """Given a PEM-formatted cert, extracts the public key in DER format.
322
323    Pass in an X509 certificate in PEM format, and you'll get back the
324    DER-formatted public key as a string.
325
326    @param pem: path to a PEM-formatted cert file.
327    @return: DER-encoded public key from cert, as a string.
328    """
329    outfile = scoped_tempfile(scoped_tempfile.tempdir.name + '/pubkey.der')
330    cmd = '%s -in %s -pubkey -noout ' % (OPENSSLX509, pem)
331    cmd += '| %s -outform DER -pubin -out %s' % (OPENSSLRSA,
332                                                 outfile.name)
333    system_output_on_fail(cmd)
334    der = utils.read_file(outfile.name)
335    return der
336
337
338def sign(pem_key, data):
339    """Signs |data| with key from |pem_key|, returns signature.
340
341    Using the PEM-formatted private key in |pem_key|, generates an
342    RSA-with-SHA1 signature over |data| and returns the signature in
343    a string.
344
345    @param pem_key: PEM-formatted private key, as a string.
346    @param data: data to be signed.
347    @return: signature as a string.
348    """
349    sig = scoped_tempfile()
350    err = scoped_tempfile()
351    data_file = scoped_tempfile()
352    data_file.fo.write(data)
353    data_file.fo.seek(0)
354
355    pem_key_file = scoped_tempfile(scoped_tempfile.tempdir.name + '/pkey.pem')
356    utils.open_write_close(pem_key_file.name, pem_key)
357
358    cmd = '%s -sign %s' % (OPENSSLCRYPTO, pem_key_file.name)
359    try:
360        utils.run(cmd,
361                  stdin=data_file.fo,
362                  stdout_tee=sig.fo,
363                  stderr_tee=err.fo)
364    except:
365        err.fo.seek(0)
366        logging.error(err.fo.read())
367        raise
368
369    sig.fo.seek(0)
370    sig_data = sig.fo.read()
371    if not sig_data:
372        raise error.OwnershipError('Empty signature!')
373    return sig_data
374
375
376def get_user_policy_key_filename(username):
377    """Returns the path to the user policy key for the given username.
378
379    @param username: the user whose policy key we want the path to.
380    @return: absolute path to user's policy key file.
381    """
382    return os.path.join(constants.USER_POLICY_DIR,
383                        cryptohome.get_user_hash(username),
384                        constants.USER_POLICY_KEY_FILENAME)
385