xref: /aosp_15_r20/external/autotest/utils/frozen_chromite/third_party/oauth2client/service_account.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright 2014 Google Inc. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""A service account credentials class.
16
17This credentials class is implemented on top of rsa library.
18"""
19
20import base64
21import time
22
23from pyasn1.codec.ber import decoder
24from pyasn1_modules.rfc5208 import PrivateKeyInfo
25import rsa
26
27from oauth2client import GOOGLE_REVOKE_URI
28from oauth2client import GOOGLE_TOKEN_URI
29from oauth2client._helpers import _json_encode
30from oauth2client._helpers import _to_bytes
31from oauth2client._helpers import _urlsafe_b64encode
32from oauth2client import util
33from oauth2client.client import AssertionCredentials
34
35
36class _ServiceAccountCredentials(AssertionCredentials):
37    """Class representing a service account (signed JWT) credential."""
38
39    MAX_TOKEN_LIFETIME_SECS = 3600  # 1 hour in seconds
40
41    def __init__(self, service_account_id, service_account_email,
42                 private_key_id, private_key_pkcs8_text, scopes,
43                 user_agent=None, token_uri=GOOGLE_TOKEN_URI,
44                 revoke_uri=GOOGLE_REVOKE_URI, **kwargs):
45
46        super(_ServiceAccountCredentials, self).__init__(
47            None, user_agent=user_agent, token_uri=token_uri,
48            revoke_uri=revoke_uri)
49
50        self._service_account_id = service_account_id
51        self._service_account_email = service_account_email
52        self._private_key_id = private_key_id
53        self._private_key = _get_private_key(private_key_pkcs8_text)
54        self._private_key_pkcs8_text = private_key_pkcs8_text
55        self._scopes = util.scopes_to_string(scopes)
56        self._user_agent = user_agent
57        self._token_uri = token_uri
58        self._revoke_uri = revoke_uri
59        self._kwargs = kwargs
60
61    def _generate_assertion(self):
62        """Generate the assertion that will be used in the request."""
63
64        header = {
65            'alg': 'RS256',
66            'typ': 'JWT',
67            'kid': self._private_key_id
68        }
69
70        now = int(time.time())
71        payload = {
72            'aud': self._token_uri,
73            'scope': self._scopes,
74            'iat': now,
75            'exp': now + _ServiceAccountCredentials.MAX_TOKEN_LIFETIME_SECS,
76            'iss': self._service_account_email
77        }
78        payload.update(self._kwargs)
79
80        first_segment = _urlsafe_b64encode(_json_encode(header))
81        second_segment = _urlsafe_b64encode(_json_encode(payload))
82        assertion_input = first_segment + b'.' + second_segment
83
84        # Sign the assertion.
85        rsa_bytes = rsa.pkcs1.sign(assertion_input, self._private_key,
86                                   'SHA-256')
87        signature = base64.urlsafe_b64encode(rsa_bytes).rstrip(b'=')
88
89        return assertion_input + b'.' + signature
90
91    def sign_blob(self, blob):
92        # Ensure that it is bytes
93        blob = _to_bytes(blob, encoding='utf-8')
94        return (self._private_key_id,
95                rsa.pkcs1.sign(blob, self._private_key, 'SHA-256'))
96
97    @property
98    def service_account_email(self):
99        return self._service_account_email
100
101    @property
102    def serialization_data(self):
103        return {
104            'type': 'service_account',
105            'client_id': self._service_account_id,
106            'client_email': self._service_account_email,
107            'private_key_id': self._private_key_id,
108            'private_key': self._private_key_pkcs8_text
109        }
110
111    def create_scoped_required(self):
112        return not self._scopes
113
114    def create_scoped(self, scopes):
115        return _ServiceAccountCredentials(self._service_account_id,
116                                          self._service_account_email,
117                                          self._private_key_id,
118                                          self._private_key_pkcs8_text,
119                                          scopes,
120                                          user_agent=self._user_agent,
121                                          token_uri=self._token_uri,
122                                          revoke_uri=self._revoke_uri,
123                                          **self._kwargs)
124
125
126def _get_private_key(private_key_pkcs8_text):
127    """Get an RSA private key object from a pkcs8 representation."""
128    private_key_pkcs8_text = _to_bytes(private_key_pkcs8_text)
129    der = rsa.pem.load_pem(private_key_pkcs8_text, 'PRIVATE KEY')
130    asn1_private_key, _ = decoder.decode(der, asn1Spec=PrivateKeyInfo())
131    return rsa.PrivateKey.load_pkcs1(
132        asn1_private_key.getComponentByName('privateKey').asOctets(),
133        format='DER')
134