xref: /aosp_15_r20/external/autotest/utils/frozen_chromite/third_party/oauth2client/crypt.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# -*- coding: utf-8 -*-
2#
3# Copyright 2014 Google Inc. All rights reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""Crypto-related routines for oauth2client."""
17
18import json
19import logging
20import time
21
22from oauth2client._helpers import _from_bytes
23from oauth2client._helpers import _json_encode
24from oauth2client._helpers import _to_bytes
25from oauth2client._helpers import _urlsafe_b64decode
26from oauth2client._helpers import _urlsafe_b64encode
27
28
29CLOCK_SKEW_SECS = 300  # 5 minutes in seconds
30AUTH_TOKEN_LIFETIME_SECS = 300  # 5 minutes in seconds
31MAX_TOKEN_LIFETIME_SECS = 86400  # 1 day in seconds
32
33logger = logging.getLogger(__name__)
34
35
36class AppIdentityError(Exception):
37    """Error to indicate crypto failure."""
38
39
40def _bad_pkcs12_key_as_pem(*args, **kwargs):
41    raise NotImplementedError('pkcs12_key_as_pem requires OpenSSL.')
42
43
44try:
45    from oauth2client._openssl_crypt import OpenSSLVerifier
46    from oauth2client._openssl_crypt import OpenSSLSigner
47    from oauth2client._openssl_crypt import pkcs12_key_as_pem
48except ImportError:  # pragma: NO COVER
49    OpenSSLVerifier = None
50    OpenSSLSigner = None
51    pkcs12_key_as_pem = _bad_pkcs12_key_as_pem
52
53try:
54    from oauth2client._pycrypto_crypt import PyCryptoVerifier
55    from oauth2client._pycrypto_crypt import PyCryptoSigner
56except ImportError:  # pragma: NO COVER
57    PyCryptoVerifier = None
58    PyCryptoSigner = None
59
60
61if OpenSSLSigner:
62    Signer = OpenSSLSigner
63    Verifier = OpenSSLVerifier
64elif PyCryptoSigner:  # pragma: NO COVER
65    Signer = PyCryptoSigner
66    Verifier = PyCryptoVerifier
67else:  # pragma: NO COVER
68    raise ImportError('No encryption library found. Please install either '
69                      'PyOpenSSL, or PyCrypto 2.6 or later')
70
71
72def make_signed_jwt(signer, payload):
73    """Make a signed JWT.
74
75    See http://self-issued.info/docs/draft-jones-json-web-token.html.
76
77    Args:
78        signer: crypt.Signer, Cryptographic signer.
79        payload: dict, Dictionary of data to convert to JSON and then sign.
80
81    Returns:
82        string, The JWT for the payload.
83    """
84    header = {'typ': 'JWT', 'alg': 'RS256'}
85
86    segments = [
87      _urlsafe_b64encode(_json_encode(header)),
88      _urlsafe_b64encode(_json_encode(payload)),
89    ]
90    signing_input = b'.'.join(segments)
91
92    signature = signer.sign(signing_input)
93    segments.append(_urlsafe_b64encode(signature))
94
95    logger.debug(str(segments))
96
97    return b'.'.join(segments)
98
99
100def _verify_signature(message, signature, certs):
101    """Verifies signed content using a list of certificates.
102
103    Args:
104        message: string or bytes, The message to verify.
105        signature: string or bytes, The signature on the message.
106        certs: iterable, certificates in PEM format.
107
108    Raises:
109        AppIdentityError: If none of the certificates can verify the message
110                          against the signature.
111    """
112    for pem in certs:
113        verifier = Verifier.from_string(pem, is_x509_cert=True)
114        if verifier.verify(message, signature):
115            return
116
117    # If we have not returned, no certificate confirms the signature.
118    raise AppIdentityError('Invalid token signature')
119
120
121def _check_audience(payload_dict, audience):
122    """Checks audience field from a JWT payload.
123
124    Does nothing if the passed in ``audience`` is null.
125
126    Args:
127        payload_dict: dict, A dictionary containing a JWT payload.
128        audience: string or NoneType, an audience to check for in
129                  the JWT payload.
130
131    Raises:
132        AppIdentityError: If there is no ``'aud'`` field in the payload
133                          dictionary but there is an ``audience`` to check.
134        AppIdentityError: If the ``'aud'`` field in the payload dictionary
135                          does not match the ``audience``.
136    """
137    if audience is None:
138        return
139
140    audience_in_payload = payload_dict.get('aud')
141    if audience_in_payload is None:
142        raise AppIdentityError('No aud field in token: %s' %
143                               (payload_dict,))
144    if audience_in_payload != audience:
145        raise AppIdentityError('Wrong recipient, %s != %s: %s' %
146                               (audience_in_payload, audience, payload_dict))
147
148
149def _verify_time_range(payload_dict):
150    """Verifies the issued at and expiration from a JWT payload.
151
152    Makes sure the current time (in UTC) falls between the issued at and
153    expiration for the JWT (with some skew allowed for via
154    ``CLOCK_SKEW_SECS``).
155
156    Args:
157        payload_dict: dict, A dictionary containing a JWT payload.
158
159    Raises:
160        AppIdentityError: If there is no ``'iat'`` field in the payload
161                          dictionary.
162        AppIdentityError: If there is no ``'exp'`` field in the payload
163                          dictionary.
164        AppIdentityError: If the JWT expiration is too far in the future (i.e.
165                          if the expiration would imply a token lifetime
166                          longer than what is allowed.)
167        AppIdentityError: If the token appears to have been issued in the
168                          future (up to clock skew).
169        AppIdentityError: If the token appears to have expired in the past
170                          (up to clock skew).
171    """
172    # Get the current time to use throughout.
173    now = int(time.time())
174
175    # Make sure issued at and expiration are in the payload.
176    issued_at = payload_dict.get('iat')
177    if issued_at is None:
178        raise AppIdentityError('No iat field in token: %s' % (payload_dict,))
179    expiration = payload_dict.get('exp')
180    if expiration is None:
181        raise AppIdentityError('No exp field in token: %s' % (payload_dict,))
182
183    # Make sure the expiration gives an acceptable token lifetime.
184    if expiration >= now + MAX_TOKEN_LIFETIME_SECS:
185        raise AppIdentityError('exp field too far in future: %s' %
186                               (payload_dict,))
187
188    # Make sure (up to clock skew) that the token wasn't issued in the future.
189    earliest = issued_at - CLOCK_SKEW_SECS
190    if now < earliest:
191        raise AppIdentityError('Token used too early, %d < %d: %s' %
192                               (now, earliest, payload_dict))
193    # Make sure (up to clock skew) that the token isn't already expired.
194    latest = expiration + CLOCK_SKEW_SECS
195    if now > latest:
196        raise AppIdentityError('Token used too late, %d > %d: %s' %
197                               (now, latest, payload_dict))
198
199
200def verify_signed_jwt_with_certs(jwt, certs, audience=None):
201    """Verify a JWT against public certs.
202
203    See http://self-issued.info/docs/draft-jones-json-web-token.html.
204
205    Args:
206        jwt: string, A JWT.
207        certs: dict, Dictionary where values of public keys in PEM format.
208        audience: string, The audience, 'aud', that this JWT should contain. If
209                  None then the JWT's 'aud' parameter is not verified.
210
211    Returns:
212        dict, The deserialized JSON payload in the JWT.
213
214    Raises:
215        AppIdentityError: if any checks are failed.
216    """
217    jwt = _to_bytes(jwt)
218
219    if jwt.count(b'.') != 2:
220        raise AppIdentityError(
221            'Wrong number of segments in token: %s' % (jwt,))
222
223    header, payload, signature = jwt.split(b'.')
224    message_to_sign = header + b'.' + payload
225    signature = _urlsafe_b64decode(signature)
226
227    # Parse token.
228    payload_bytes = _urlsafe_b64decode(payload)
229    try:
230        payload_dict = json.loads(_from_bytes(payload_bytes))
231    except:
232        raise AppIdentityError('Can\'t parse token: %s' % (payload_bytes,))
233
234    # Verify that the signature matches the message.
235    _verify_signature(message_to_sign, signature, certs.values())
236
237    # Verify the issued at and created times in the payload.
238    _verify_time_range(payload_dict)
239
240    # Check audience.
241    _check_audience(payload_dict, audience)
242
243    return payload_dict
244