1# Copyright 2021 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13"""The JwtValidator.""" 14 15import datetime 16 17from typing import Optional 18from tink.jwt import _jwt_error 19from tink.jwt import _raw_jwt 20 21_MAX_CLOCK_SKEW = datetime.timedelta(minutes=10) 22 23 24class JwtValidator: 25 """A JwtValidator defines how JSON Web Tokens (JWTs) should be validated. 26 27 By default, the JwtValidator requires that a token has a valid expiration 28 claim, no issuer and no audience claim. This can be changed using the 29 expect_... and ignore_... arguments. 30 31 If present, the JwtValidator also validates the not-before claim. The 32 validation time can be changed using the fixed_now parameter. clock_skew can 33 be set to allow a small leeway (not more than 10 minutes) to account for 34 clock skew. 35 """ 36 37 def __init__(self, 38 *, 39 expected_type_header: Optional[str], 40 expected_issuer: Optional[str], 41 expected_audience: Optional[str], 42 ignore_type_header: bool, 43 ignore_issuer: bool, 44 ignore_audiences: bool, 45 allow_missing_expiration: bool, 46 expect_issued_in_the_past: bool, 47 clock_skew: Optional[datetime.timedelta], 48 fixed_now: Optional[datetime.datetime]) -> None: 49 if expected_type_header and ignore_type_header: 50 raise ValueError( 51 'expected_type_header and ignore_type_header cannot be used together') 52 if expected_issuer and ignore_issuer: 53 raise ValueError( 54 'expected_issuer and ignore_issuer cannot be used together') 55 if expected_audience and ignore_audiences: 56 raise ValueError( 57 'expected_audience and ignore_audiences cannot be used together') 58 self._expected_type_header = expected_type_header 59 self._expected_issuer = expected_issuer 60 self._expected_audience = expected_audience 61 self._ignore_type_header = ignore_type_header 62 self._ignore_issuer = ignore_issuer 63 self._ignore_audiences = ignore_audiences 64 self._allow_missing_expiration = allow_missing_expiration 65 self._expect_issued_in_the_past = expect_issued_in_the_past 66 if clock_skew: 67 if clock_skew > _MAX_CLOCK_SKEW: 68 raise ValueError('clock skew too large, max is 10 minutes') 69 self._clock_skew = clock_skew 70 else: 71 self._clock_skew = datetime.timedelta() 72 if fixed_now and not fixed_now.tzinfo: 73 raise ValueError('fixed_now without tzinfo') 74 self._fixed_now = fixed_now 75 76 def has_expected_type_header(self) -> bool: 77 return self._expected_type_header is not None 78 79 def expected_type_header(self) -> str: 80 return self._expected_type_header 81 82 def has_expected_issuer(self) -> bool: 83 return self._expected_issuer is not None 84 85 def expected_issuer(self) -> str: 86 return self._expected_issuer 87 88 def has_expected_audience(self) -> bool: 89 return self._expected_audience is not None 90 91 def expected_audience(self) -> str: 92 return self._expected_audience 93 94 def ignore_type_header(self) -> bool: 95 return self._ignore_type_header 96 97 def ignore_issuer(self) -> bool: 98 return self._ignore_issuer 99 100 def ignore_audiences(self) -> bool: 101 return self._ignore_audiences 102 103 def allow_missing_expiration(self) -> bool: 104 return self._allow_missing_expiration 105 106 def expect_issued_in_the_past(self) -> bool: 107 return self._expect_issued_in_the_past 108 109 def clock_skew(self) -> datetime.timedelta: 110 return self._clock_skew 111 112 def has_fixed_now(self) -> bool: 113 return self._fixed_now is not None 114 115 def fixed_now(self) -> datetime.datetime: 116 return self._fixed_now 117 118 119def new_validator( 120 *, 121 expected_type_header: Optional[str] = None, 122 expected_issuer: Optional[str] = None, 123 expected_audience: Optional[str] = None, 124 ignore_type_header: bool = False, 125 ignore_issuer: bool = False, 126 ignore_audiences: bool = False, 127 allow_missing_expiration: bool = False, 128 expect_issued_in_the_past: bool = False, 129 clock_skew: Optional[datetime.timedelta] = None, 130 fixed_now: Optional[datetime.datetime] = None) -> JwtValidator: 131 """Creates a new JwtValidator.""" 132 return JwtValidator( 133 expected_type_header=expected_type_header, 134 expected_issuer=expected_issuer, 135 expected_audience=expected_audience, 136 ignore_type_header=ignore_type_header, 137 ignore_issuer=ignore_issuer, 138 ignore_audiences=ignore_audiences, 139 allow_missing_expiration=allow_missing_expiration, 140 expect_issued_in_the_past=expect_issued_in_the_past, 141 clock_skew=clock_skew, 142 fixed_now=fixed_now) 143 144 145def validate(validator: JwtValidator, raw_jwt: _raw_jwt.RawJwt) -> None: 146 """Validates a jwt.RawJwt and raises JwtInvalidError if it is invalid. 147 148 This function is called by the JWT primitives and does not need to be called 149 by the user. 150 151 Args: 152 validator: a jwt.JwtValidator that defines how to validate tokens. 153 raw_jwt: a jwt.RawJwt token to validate. 154 Raises: 155 jwt.JwtInvalidError 156 """ 157 if validator.has_fixed_now(): 158 now = validator.fixed_now() 159 else: 160 now = datetime.datetime.now(tz=datetime.timezone.utc) 161 if not raw_jwt.has_expiration() and not validator.allow_missing_expiration(): 162 raise _jwt_error.JwtInvalidError('token is missing an expiration') 163 if (raw_jwt.has_expiration() and 164 raw_jwt.expiration() <= now - validator.clock_skew()): 165 raise _jwt_error.JwtInvalidError('token has expired since %s' % 166 raw_jwt.expiration()) 167 if (raw_jwt.has_not_before() and 168 raw_jwt.not_before() > now + validator.clock_skew()): 169 raise _jwt_error.JwtInvalidError('token cannot be used before %s' % 170 raw_jwt.not_before()) 171 if validator.expect_issued_in_the_past(): 172 if not raw_jwt.has_issued_at(): 173 raise _jwt_error.JwtInvalidError('token is missing iat claim') 174 if raw_jwt.issued_at() > now + validator.clock_skew(): 175 raise _jwt_error.JwtInvalidError( 176 'token has a invalid iat claim in the future: %s' % 177 raw_jwt.issued_at()) 178 if validator.has_expected_type_header(): 179 if not raw_jwt.has_type_header(): 180 raise _jwt_error.JwtInvalidError( 181 'invalid JWT; missing expected type header %s.' % 182 validator.expected_type_header()) 183 if validator.expected_type_header() != raw_jwt.type_header(): 184 raise _jwt_error.JwtInvalidError( 185 'invalid JWT; expected type header %s, but got %s' % 186 (validator.expected_type_header(), raw_jwt.type_header())) 187 else: 188 if raw_jwt.has_type_header() and not validator.ignore_type_header(): 189 raise _jwt_error.JwtInvalidError( 190 'invalid JWT; token has type_header set, but validator not.') 191 if validator.has_expected_issuer(): 192 if not raw_jwt.has_issuer(): 193 raise _jwt_error.JwtInvalidError( 194 'invalid JWT; missing expected issuer %s.' % 195 validator.expected_issuer()) 196 if validator.expected_issuer() != raw_jwt.issuer(): 197 raise _jwt_error.JwtInvalidError( 198 'invalid JWT; expected issuer %s, but got %s' % 199 (validator.expected_issuer(), raw_jwt.issuer())) 200 else: 201 if raw_jwt.has_issuer() and not validator.ignore_issuer(): 202 raise _jwt_error.JwtInvalidError( 203 'invalid JWT; token has issuer set, but validator not.') 204 if validator.has_expected_audience(): 205 if (not raw_jwt.has_audiences() or 206 validator.expected_audience() not in raw_jwt.audiences()): 207 raise _jwt_error.JwtInvalidError( 208 'invalid JWT; missing expected audience %s.' % 209 validator.expected_audience()) 210 else: 211 if raw_jwt.has_audiences() and not validator.ignore_audiences(): 212 raise _jwt_error.JwtInvalidError( 213 'invalid JWT; token has audience set, but validator not.') 214