xref: /aosp_15_r20/external/autotest/client/cros/ownership.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python2, python3
2*9c5db199SXin Li# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
4*9c5db199SXin Li# found in the LICENSE file.
5*9c5db199SXin Li
6*9c5db199SXin Liimport logging, os, shutil, tempfile
7*9c5db199SXin Li
8*9c5db199SXin Liimport common
9*9c5db199SXin Li
10*9c5db199SXin Lifrom autotest_lib.client.bin import utils
11*9c5db199SXin Lifrom autotest_lib.client.common_lib import autotemp, error
12*9c5db199SXin Lifrom autotest_lib.client.cros import constants
13*9c5db199SXin Lifrom autotest_lib.client.cros import cros_ui
14*9c5db199SXin Lifrom autotest_lib.client.cros import cryptohome
15*9c5db199SXin Li
16*9c5db199SXin Li
17*9c5db199SXin LiPK12UTIL = 'pk12util'
18*9c5db199SXin LiCERTUTIL = 'certutil'
19*9c5db199SXin LiOPENSSLP12 = 'openssl pkcs12'
20*9c5db199SXin LiOPENSSLX509 = 'openssl x509'
21*9c5db199SXin LiOPENSSLRSA = 'openssl rsa'
22*9c5db199SXin LiOPENSSLREQ = 'openssl req'
23*9c5db199SXin LiOPENSSLCRYPTO = 'openssl sha1'
24*9c5db199SXin Li
25*9c5db199SXin LiTESTUSER = '[email protected]'
26*9c5db199SXin LiTESTPASS = 'testme'
27*9c5db199SXin Li
28*9c5db199SXin Li
29*9c5db199SXin Liclass OwnershipError(error.TestError):
30*9c5db199SXin Li    """Generic error for ownership-related failures."""
31*9c5db199SXin Li    pass
32*9c5db199SXin Li
33*9c5db199SXin Li
34*9c5db199SXin Liclass scoped_tempfile(object):
35*9c5db199SXin Li    """A wrapper that provides scoped semantics for temporary files.
36*9c5db199SXin Li
37*9c5db199SXin Li    Providing a file path causes the scoped_tempfile to take ownership of the
38*9c5db199SXin Li    file at the provided path.  The file at the path will be deleted when this
39*9c5db199SXin Li    object goes out of scope.  If no path is provided, then a temporary file
40*9c5db199SXin Li    object will be created for the lifetime of the scoped_tempfile
41*9c5db199SXin Li
42*9c5db199SXin Li    autotemp.tempfile objects don't seem to play nicely with being
43*9c5db199SXin Li    used in system commands, so they can't be used for my purposes.
44*9c5db199SXin Li    """
45*9c5db199SXin Li
46*9c5db199SXin Li    tempdir = autotemp.tempdir(unique_id='ownership')
47*9c5db199SXin Li
48*9c5db199SXin Li    def __init__(self, name=None):
49*9c5db199SXin Li        self.name = name
50*9c5db199SXin Li        if not self.name:
51*9c5db199SXin Li            self.fo = tempfile.TemporaryFile()
52*9c5db199SXin Li
53*9c5db199SXin Li
54*9c5db199SXin Li    def __del__(self):
55*9c5db199SXin Li        if self.name:
56*9c5db199SXin Li            if os.path.exists(self.name):
57*9c5db199SXin Li                os.unlink(self.name)
58*9c5db199SXin Li        else:
59*9c5db199SXin Li            self.fo.close()  # Will destroy the underlying tempfile
60*9c5db199SXin Li
61*9c5db199SXin Li
62*9c5db199SXin Lidef system_output_on_fail(cmd):
63*9c5db199SXin Li    """Run a |cmd|, capturing output and logging it only on error.
64*9c5db199SXin Li
65*9c5db199SXin Li    @param cmd: the command to run.
66*9c5db199SXin Li    """
67*9c5db199SXin Li    output = None
68*9c5db199SXin Li    try:
69*9c5db199SXin Li        output = utils.system_output(cmd)
70*9c5db199SXin Li    except:
71*9c5db199SXin Li        logging.error(output)
72*9c5db199SXin Li        raise
73*9c5db199SXin Li
74*9c5db199SXin Li
75*9c5db199SXin Lidef __unlink(filename):
76*9c5db199SXin Li    """unlink a file, but log OSError and IOError instead of raising.
77*9c5db199SXin Li
78*9c5db199SXin Li    This allows unlinking files that don't exist safely.
79*9c5db199SXin Li
80*9c5db199SXin Li    @param filename: the file to attempt to unlink.
81*9c5db199SXin Li    """
82*9c5db199SXin Li    try:
83*9c5db199SXin Li        os.unlink(filename)
84*9c5db199SXin Li    except (IOError, OSError) as error:
85*9c5db199SXin Li        logging.info(error)
86*9c5db199SXin Li
87*9c5db199SXin Li
88*9c5db199SXin Lidef restart_ui_to_clear_ownership_files():
89*9c5db199SXin Li    """Remove on-disk state related to device ownership.
90*9c5db199SXin Li
91*9c5db199SXin Li    The UI must be stopped while we do this, or the session_manager will
92*9c5db199SXin Li    write the policy and key files out again.
93*9c5db199SXin Li    """
94*9c5db199SXin Li    cros_ui.stop(allow_fail=not cros_ui.is_up())
95*9c5db199SXin Li    clear_ownership_files_no_restart()
96*9c5db199SXin Li    cros_ui.start()
97*9c5db199SXin Li
98*9c5db199SXin Li
99*9c5db199SXin Lidef clear_ownership_files_no_restart():
100*9c5db199SXin Li    """Remove on-disk state related to device ownership.
101*9c5db199SXin Li
102*9c5db199SXin Li    The UI must be stopped while we do this, or the session_manager will
103*9c5db199SXin Li    write the policy and key files out again.
104*9c5db199SXin Li    """
105*9c5db199SXin Li    if cros_ui.is_up():
106*9c5db199SXin Li        raise error.TestError("Tried to clear ownership with UI running.")
107*9c5db199SXin Li    __unlink(constants.OWNER_KEY_FILE)
108*9c5db199SXin Li    __unlink(constants.SIGNED_POLICY_FILE)
109*9c5db199SXin Li    __unlink(os.path.join(constants.USER_DATA_DIR, 'Local State'))
110*9c5db199SXin Li
111*9c5db199SXin Li
112*9c5db199SXin Lidef fake_ownership():
113*9c5db199SXin Li    """Fake ownership by generating the necessary magic files."""
114*9c5db199SXin Li    # Determine the module directory.
115*9c5db199SXin Li    dirname = os.path.dirname(__file__)
116*9c5db199SXin Li    mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
117*9c5db199SXin Li    mock_signedpolicyfile = os.path.join(dirname,
118*9c5db199SXin Li                                         constants.MOCK_OWNER_POLICY)
119*9c5db199SXin Li    utils.open_write_close(constants.OWNER_KEY_FILE,
120*9c5db199SXin Li                           cert_extract_pubkey_der(mock_certfile))
121*9c5db199SXin Li    shutil.copy(mock_signedpolicyfile,
122*9c5db199SXin Li                constants.SIGNED_POLICY_FILE)
123*9c5db199SXin Li
124*9c5db199SXin Li
125*9c5db199SXin LiPOLICY_TYPE = 'google/chromeos/device'
126*9c5db199SXin Li
127*9c5db199SXin Li
128*9c5db199SXin Lidef assert_has_policy_data(response_proto):
129*9c5db199SXin Li    """Assert that given protobuf has a policy_data field.
130*9c5db199SXin Li
131*9c5db199SXin Li    @param response_proto: a PolicyFetchResponse protobuf.
132*9c5db199SXin Li    @raises OwnershipError on failure.
133*9c5db199SXin Li    """
134*9c5db199SXin Li    if not response_proto.HasField("policy_data"):
135*9c5db199SXin Li        raise OwnershipError('Malformatted response.')
136*9c5db199SXin Li
137*9c5db199SXin Li
138*9c5db199SXin Lidef assert_has_device_settings(data_proto):
139*9c5db199SXin Li    """Assert that given protobuf is a policy with device settings in it.
140*9c5db199SXin Li
141*9c5db199SXin Li    @param data_proto: a PolicyData protobuf.
142*9c5db199SXin Li    @raises OwnershipError if this isn't CrOS policy, or has no settings inside.
143*9c5db199SXin Li    """
144*9c5db199SXin Li    if (not data_proto.HasField("policy_type") or
145*9c5db199SXin Li        data_proto.policy_type != POLICY_TYPE or
146*9c5db199SXin Li        not data_proto.HasField("policy_value")):
147*9c5db199SXin Li        raise OwnershipError('Malformatted response.')
148*9c5db199SXin Li
149*9c5db199SXin Li
150*9c5db199SXin Lidef assert_username(data_proto, username):
151*9c5db199SXin Li    """Assert that given protobuf is a policy associated with the given user.
152*9c5db199SXin Li
153*9c5db199SXin Li    @param data_proto: a PolicyData protobuf.
154*9c5db199SXin Li    @param username: the username to check for
155*9c5db199SXin Li    @raises OwnershipError if data_proto isn't associated with username
156*9c5db199SXin Li    """
157*9c5db199SXin Li    if data_proto.username != username:
158*9c5db199SXin Li        raise OwnershipError('Incorrect username.')
159*9c5db199SXin Li
160*9c5db199SXin Li
161*9c5db199SXin Lidef assert_guest_setting(settings, guests):
162*9c5db199SXin Li    """Assert that given protobuf has given guest-related settings.
163*9c5db199SXin Li
164*9c5db199SXin Li    @param settings: a ChromeDeviceSettingsProto protobuf.
165*9c5db199SXin Li    @param guests: boolean indicating whether guests are allowed to sign in.
166*9c5db199SXin Li    @raises OwnershipError if settings doesn't enforce the provided setting.
167*9c5db199SXin Li    """
168*9c5db199SXin Li    if not settings.HasField("guest_mode_enabled"):
169*9c5db199SXin Li        raise OwnershipError('No guest mode setting protobuf.')
170*9c5db199SXin Li    if not settings.guest_mode_enabled.HasField("guest_mode_enabled"):
171*9c5db199SXin Li        raise OwnershipError('No guest mode setting.')
172*9c5db199SXin Li    if settings.guest_mode_enabled.guest_mode_enabled != guests:
173*9c5db199SXin Li        raise OwnershipError('Incorrect guest mode setting.')
174*9c5db199SXin Li
175*9c5db199SXin Li
176*9c5db199SXin Lidef assert_show_users(settings, show_users):
177*9c5db199SXin Li    """Assert that given protobuf has given user-avatar-showing settings.
178*9c5db199SXin Li
179*9c5db199SXin Li    @param settings: a ChromeDeviceSettingsProto protobuf.
180*9c5db199SXin Li    @param show_users: boolean indicating whether avatars are shown on sign in.
181*9c5db199SXin Li    @raises OwnershipError if settings doesn't enforce the provided setting.
182*9c5db199SXin Li    """
183*9c5db199SXin Li    if not settings.HasField("show_user_names"):
184*9c5db199SXin Li        raise OwnershipError('No show users setting protobuf.')
185*9c5db199SXin Li    if not settings.show_user_names.HasField("show_user_names"):
186*9c5db199SXin Li        raise OwnershipError('No show users setting.')
187*9c5db199SXin Li    if settings.show_user_names.show_user_names != show_users:
188*9c5db199SXin Li        raise OwnershipError('Incorrect show users setting.')
189*9c5db199SXin Li
190*9c5db199SXin Li
191*9c5db199SXin Lidef assert_roaming(settings, roaming):
192*9c5db199SXin Li    """Assert that given protobuf has given roaming settings.
193*9c5db199SXin Li
194*9c5db199SXin Li    @param settings: a ChromeDeviceSettingsProto protobuf.
195*9c5db199SXin Li    @param roaming: boolean indicating whether roaming is allowed.
196*9c5db199SXin Li    @raises OwnershipError if settings doesn't enforce the provided setting.
197*9c5db199SXin Li    """
198*9c5db199SXin Li    if not settings.HasField("data_roaming_enabled"):
199*9c5db199SXin Li        raise OwnershipError('No roaming setting protobuf.')
200*9c5db199SXin Li    if not settings.data_roaming_enabled.HasField("data_roaming_enabled"):
201*9c5db199SXin Li        raise OwnershipError('No roaming setting.')
202*9c5db199SXin Li    if settings.data_roaming_enabled.data_roaming_enabled != roaming:
203*9c5db199SXin Li        raise OwnershipError('Incorrect roaming setting.')
204*9c5db199SXin Li
205*9c5db199SXin Li
206*9c5db199SXin Lidef assert_new_users(settings, new_users):
207*9c5db199SXin Li    """Assert that given protobuf has given new user settings.
208*9c5db199SXin Li
209*9c5db199SXin Li    @param settings: a ChromeDeviceSettingsProto protobuf.
210*9c5db199SXin Li    @param new_users: boolean indicating whether adding users is allowed.
211*9c5db199SXin Li    @raises OwnershipError if settings doesn't enforce the provided setting.
212*9c5db199SXin Li    """
213*9c5db199SXin Li    if not settings.HasField("allow_new_users"):
214*9c5db199SXin Li        raise OwnershipError('No allow new users setting protobuf.')
215*9c5db199SXin Li    if not settings.allow_new_users.HasField("allow_new_users"):
216*9c5db199SXin Li        raise OwnershipError('No allow new users setting.')
217*9c5db199SXin Li    if settings.allow_new_users.allow_new_users != new_users:
218*9c5db199SXin Li        raise OwnershipError('Incorrect allow new users setting.')
219*9c5db199SXin Li
220*9c5db199SXin Li
221*9c5db199SXin Lidef __user_nssdb(user):
222*9c5db199SXin Li    """Returns the path to the NSSDB for the provided user.
223*9c5db199SXin Li
224*9c5db199SXin Li    @param user: the user whose NSSDB the caller wants.
225*9c5db199SXin Li    @return: absolute path to user's NSSDB.
226*9c5db199SXin Li    """
227*9c5db199SXin Li    return os.path.join(cryptohome.user_path(user), '.pki', 'nssdb')
228*9c5db199SXin Li
229*9c5db199SXin Li
230*9c5db199SXin Lidef use_known_ownerkeys(user):
231*9c5db199SXin Li    """Sets the system up to use a well-known keypair for owner operations.
232*9c5db199SXin Li
233*9c5db199SXin Li    Assuming the appropriate cryptohome is already mounted, configures the
234*9c5db199SXin Li    device to accept policies signed with the checked-in 'mock' owner key.
235*9c5db199SXin Li
236*9c5db199SXin Li    @param user: the user whose NSSDB should be populated with key material.
237*9c5db199SXin Li    """
238*9c5db199SXin Li    dirname = os.path.dirname(__file__)
239*9c5db199SXin Li    mock_keyfile = os.path.join(dirname, constants.MOCK_OWNER_KEY)
240*9c5db199SXin Li    mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
241*9c5db199SXin Li    push_to_nss(mock_keyfile, mock_certfile, __user_nssdb(user))
242*9c5db199SXin Li    utils.open_write_close(constants.OWNER_KEY_FILE,
243*9c5db199SXin Li                           cert_extract_pubkey_der(mock_certfile))
244*9c5db199SXin Li
245*9c5db199SXin Li
246*9c5db199SXin Lidef known_privkey():
247*9c5db199SXin Li    """Returns the mock owner private key in PEM format.
248*9c5db199SXin Li
249*9c5db199SXin Li    @return: mock owner private key in PEM format.
250*9c5db199SXin Li    """
251*9c5db199SXin Li    dirname = os.path.dirname(__file__)
252*9c5db199SXin Li    return utils.read_file(os.path.join(dirname, constants.MOCK_OWNER_KEY))
253*9c5db199SXin Li
254*9c5db199SXin Li
255*9c5db199SXin Lidef known_pubkey():
256*9c5db199SXin Li    """Returns the mock owner public key in DER format.
257*9c5db199SXin Li
258*9c5db199SXin Li    @return: mock owner public key in DER format.
259*9c5db199SXin Li    """
260*9c5db199SXin Li    dirname = os.path.dirname(__file__)
261*9c5db199SXin Li    return cert_extract_pubkey_der(os.path.join(dirname,
262*9c5db199SXin Li                                                constants.MOCK_OWNER_CERT))
263*9c5db199SXin Li
264*9c5db199SXin Li
265*9c5db199SXin Lidef pairgen():
266*9c5db199SXin Li    """Generate a self-signed cert and associated private key.
267*9c5db199SXin Li
268*9c5db199SXin Li    Generates a self-signed X509 certificate and the associated private key.
269*9c5db199SXin Li    The key is 2048 bits.  The generated material is stored in PEM format
270*9c5db199SXin Li    and the paths to the two files are returned.
271*9c5db199SXin Li
272*9c5db199SXin Li    The caller is responsible for cleaning up these files.
273*9c5db199SXin Li
274*9c5db199SXin Li    @return: (/path/to/private_key, /path/to/self-signed_cert)
275*9c5db199SXin Li    """
276*9c5db199SXin Li    keyfile = scoped_tempfile.tempdir.name + '/private.key'
277*9c5db199SXin Li    certfile = scoped_tempfile.tempdir.name + '/cert.pem'
278*9c5db199SXin Li    cmd = '%s -x509 -subj %s -newkey rsa:2048 -nodes -keyout %s -out %s' % (
279*9c5db199SXin Li        OPENSSLREQ, '/CN=me', keyfile, certfile)
280*9c5db199SXin Li    system_output_on_fail(cmd)
281*9c5db199SXin Li    return (keyfile, certfile)
282*9c5db199SXin Li
283*9c5db199SXin Li
284*9c5db199SXin Lidef pairgen_as_data():
285*9c5db199SXin Li    """Generates keypair, returns keys as data.
286*9c5db199SXin Li
287*9c5db199SXin Li    Generates a fresh owner keypair and then passes back the
288*9c5db199SXin Li    PEM-encoded private key and the DER-encoded public key.
289*9c5db199SXin Li
290*9c5db199SXin Li    @return: (PEM-encoded private key, DER-encoded public key)
291*9c5db199SXin Li    """
292*9c5db199SXin Li    (keypath, certpath) = pairgen()
293*9c5db199SXin Li    keyfile = scoped_tempfile(keypath)
294*9c5db199SXin Li    certfile = scoped_tempfile(certpath)
295*9c5db199SXin Li    return (utils.read_file(keyfile.name),
296*9c5db199SXin Li            cert_extract_pubkey_der(certfile.name))
297*9c5db199SXin Li
298*9c5db199SXin Li
299*9c5db199SXin Lidef push_to_nss(keyfile, certfile, nssdb):
300*9c5db199SXin Li    """Takes a pre-generated key pair and pushes them to an NSS DB.
301*9c5db199SXin Li
302*9c5db199SXin Li    Given paths to a private key and cert in PEM format, stores the pair
303*9c5db199SXin Li    in the provided nssdb.
304*9c5db199SXin Li
305*9c5db199SXin Li    @param keyfile: path to PEM-formatted private key file.
306*9c5db199SXin Li    @param certfile: path to PEM-formatted cert file for associated public key.
307*9c5db199SXin Li    @param nssdb: path to NSSDB to be populated with the provided keys.
308*9c5db199SXin Li    """
309*9c5db199SXin Li    for_push = scoped_tempfile(scoped_tempfile.tempdir.name + '/for_push.p12')
310*9c5db199SXin Li    cmd = '%s -export -in %s -inkey %s -out %s ' % (
311*9c5db199SXin Li        OPENSSLP12, certfile, keyfile, for_push.name)
312*9c5db199SXin Li    cmd += '-passin pass: -passout pass:'
313*9c5db199SXin Li    system_output_on_fail(cmd)
314*9c5db199SXin Li    cmd = '%s -d "sql:%s" -i %s -W ""' % (PK12UTIL,
315*9c5db199SXin Li                                          nssdb,
316*9c5db199SXin Li                                          for_push.name)
317*9c5db199SXin Li    system_output_on_fail(cmd)
318*9c5db199SXin Li
319*9c5db199SXin Li
320*9c5db199SXin Lidef cert_extract_pubkey_der(pem):
321*9c5db199SXin Li    """Given a PEM-formatted cert, extracts the public key in DER format.
322*9c5db199SXin Li
323*9c5db199SXin Li    Pass in an X509 certificate in PEM format, and you'll get back the
324*9c5db199SXin Li    DER-formatted public key as a string.
325*9c5db199SXin Li
326*9c5db199SXin Li    @param pem: path to a PEM-formatted cert file.
327*9c5db199SXin Li    @return: DER-encoded public key from cert, as a string.
328*9c5db199SXin Li    """
329*9c5db199SXin Li    outfile = scoped_tempfile(scoped_tempfile.tempdir.name + '/pubkey.der')
330*9c5db199SXin Li    cmd = '%s -in %s -pubkey -noout ' % (OPENSSLX509, pem)
331*9c5db199SXin Li    cmd += '| %s -outform DER -pubin -out %s' % (OPENSSLRSA,
332*9c5db199SXin Li                                                 outfile.name)
333*9c5db199SXin Li    system_output_on_fail(cmd)
334*9c5db199SXin Li    der = utils.read_file(outfile.name)
335*9c5db199SXin Li    return der
336*9c5db199SXin Li
337*9c5db199SXin Li
338*9c5db199SXin Lidef sign(pem_key, data):
339*9c5db199SXin Li    """Signs |data| with key from |pem_key|, returns signature.
340*9c5db199SXin Li
341*9c5db199SXin Li    Using the PEM-formatted private key in |pem_key|, generates an
342*9c5db199SXin Li    RSA-with-SHA1 signature over |data| and returns the signature in
343*9c5db199SXin Li    a string.
344*9c5db199SXin Li
345*9c5db199SXin Li    @param pem_key: PEM-formatted private key, as a string.
346*9c5db199SXin Li    @param data: data to be signed.
347*9c5db199SXin Li    @return: signature as a string.
348*9c5db199SXin Li    """
349*9c5db199SXin Li    sig = scoped_tempfile()
350*9c5db199SXin Li    err = scoped_tempfile()
351*9c5db199SXin Li    data_file = scoped_tempfile()
352*9c5db199SXin Li    data_file.fo.write(data)
353*9c5db199SXin Li    data_file.fo.seek(0)
354*9c5db199SXin Li
355*9c5db199SXin Li    pem_key_file = scoped_tempfile(scoped_tempfile.tempdir.name + '/pkey.pem')
356*9c5db199SXin Li    utils.open_write_close(pem_key_file.name, pem_key)
357*9c5db199SXin Li
358*9c5db199SXin Li    cmd = '%s -sign %s' % (OPENSSLCRYPTO, pem_key_file.name)
359*9c5db199SXin Li    try:
360*9c5db199SXin Li        utils.run(cmd,
361*9c5db199SXin Li                  stdin=data_file.fo,
362*9c5db199SXin Li                  stdout_tee=sig.fo,
363*9c5db199SXin Li                  stderr_tee=err.fo)
364*9c5db199SXin Li    except:
365*9c5db199SXin Li        err.fo.seek(0)
366*9c5db199SXin Li        logging.error(err.fo.read())
367*9c5db199SXin Li        raise
368*9c5db199SXin Li
369*9c5db199SXin Li    sig.fo.seek(0)
370*9c5db199SXin Li    sig_data = sig.fo.read()
371*9c5db199SXin Li    if not sig_data:
372*9c5db199SXin Li        raise error.OwnershipError('Empty signature!')
373*9c5db199SXin Li    return sig_data
374*9c5db199SXin Li
375*9c5db199SXin Li
376*9c5db199SXin Lidef get_user_policy_key_filename(username):
377*9c5db199SXin Li    """Returns the path to the user policy key for the given username.
378*9c5db199SXin Li
379*9c5db199SXin Li    @param username: the user whose policy key we want the path to.
380*9c5db199SXin Li    @return: absolute path to user's policy key file.
381*9c5db199SXin Li    """
382*9c5db199SXin Li    return os.path.join(constants.USER_POLICY_DIR,
383*9c5db199SXin Li                        cryptohome.get_user_hash(username),
384*9c5db199SXin Li                        constants.USER_POLICY_KEY_FILENAME)
385