1# Copyright 2021 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests for tink.python.tink.jwt._jwt_format.""" 15 16from absl.testing import absltest 17from absl.testing import parameterized 18from tink.proto import tink_pb2 19from tink.jwt import _json_util 20from tink.jwt import _jwt_error 21from tink.jwt import _jwt_format 22from tink.jwt import _raw_jwt 23 24 25class JwtFormatTest(parameterized.TestCase): 26 27 def test_base64_encode_decode_header_fixed_data(self): 28 # Example from https://tools.ietf.org/html/rfc7519#section-3.1 29 header = bytes([ 30 123, 34, 116, 121, 112, 34, 58, 34, 74, 87, 84, 34, 44, 13, 10, 32, 34, 31 97, 108, 103, 34, 58, 34, 72, 83, 50, 53, 54, 34, 125 32 ]) 33 encoded_header = b'eyJ0eXAiOiJKV1QiLA0KICJhbGciOiJIUzI1NiJ9' 34 self.assertEqual(_jwt_format.base64_encode(header), encoded_header) 35 self.assertEqual(_jwt_format.base64_decode(encoded_header), header) 36 37 def test_base64_encode_decode_payload_fixed_data(self): 38 # Example from https://tools.ietf.org/html/rfc7519#section-3.1 39 payload = bytes([ 40 123, 34, 105, 115, 115, 34, 58, 34, 106, 111, 101, 34, 44, 13, 10, 32, 41 34, 101, 120, 112, 34, 58, 49, 51, 48, 48, 56, 49, 57, 51, 56, 48, 44, 42 13, 10, 32, 34, 104, 116, 116, 112, 58, 47, 47, 101, 120, 97, 109, 112, 43 108, 101, 46, 99, 111, 109, 47, 105, 115, 95, 114, 111, 111, 116, 34, 44 58, 116, 114, 117, 101, 125 45 ]) 46 encoded_payload = (b'eyJpc3MiOiJqb2UiLA0KICJleHAiOjEzMDA4MTkzODAsDQogImh0' 47 b'dHA6Ly9leGFtcGxlLmNvbS9pc19yb290Ijp0cnVlfQ') 48 self.assertEqual(_jwt_format.base64_encode(payload), encoded_payload) 49 self.assertEqual(_jwt_format.base64_decode(encoded_payload), payload) 50 51 def test_base64_decode_bad_format_raises_jwt_invalid_error(self): 52 with self.assertRaises(_jwt_error.JwtInvalidError): 53 _jwt_format.base64_decode(b'aeyJh') 54 55 def test_base64_decode_fails_with_unknown_chars(self): 56 self.assertNotEmpty( 57 _jwt_format.base64_decode( 58 b'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-') 59 ) 60 self.assertEqual(_jwt_format.base64_decode(b''), b'') 61 with self.assertRaises(_jwt_error.JwtInvalidError): 62 _jwt_format.base64_decode(b'[') 63 with self.assertRaises(_jwt_error.JwtInvalidError): 64 _jwt_format.base64_decode(b'@') 65 with self.assertRaises(_jwt_error.JwtInvalidError): 66 _jwt_format.base64_decode(b'/') 67 with self.assertRaises(_jwt_error.JwtInvalidError): 68 _jwt_format.base64_decode(b':') 69 with self.assertRaises(_jwt_error.JwtInvalidError): 70 _jwt_format.base64_decode(b'{') 71 72 def test_decode_encode_header_hs256(self): 73 # Example from https://tools.ietf.org/html/rfc7515#appendix-A.1 74 encoded_header = b'eyJ0eXAiOiJKV1QiLA0KICJhbGciOiJIUzI1NiJ9' 75 json_header = _jwt_format.decode_header(encoded_header) 76 header = _json_util.json_loads(json_header) 77 self.assertEqual(header['alg'], 'HS256') 78 self.assertEqual(header['typ'], 'JWT') 79 self.assertEqual( 80 _jwt_format.decode_header(_jwt_format.encode_header(json_header)), 81 json_header) 82 83 def test_decode_encode_header_rs256(self): 84 # Example from https://tools.ietf.org/html/rfc7515#appendix-A.2 85 encoded_header = b'eyJhbGciOiJSUzI1NiJ9' 86 json_header = _jwt_format.decode_header(encoded_header) 87 header = _json_util.json_loads(json_header) 88 self.assertEqual(header['alg'], 'RS256') 89 self.assertEqual( 90 _jwt_format.decode_header(_jwt_format.encode_header(json_header)), 91 json_header) 92 93 def test_encode_decode_header(self): 94 encoded_header = _jwt_format.encode_header('{ "alg": "RS256"} ') 95 json_header = _jwt_format.decode_header(encoded_header) 96 self.assertEqual(json_header, '{ "alg": "RS256"} ') 97 98 def test_decode_header_with_invalid_utf8(self): 99 encoded_header = _jwt_format.base64_encode(b'{"alg":"RS256", "bad":"\xc2"}') 100 with self.assertRaises(_jwt_error.JwtInvalidError): 101 _jwt_format.decode_header(encoded_header) 102 103 def test_encode_header_with_utf16_surrogate(self): 104 self.assertEqual( 105 _jwt_format.encode_header('{"alg": "RS256", "a":"\U0001d11e"}'), 106 b'eyJhbGciOiAiUlMyNTYiLCAiYSI6IvCdhJ4ifQ') 107 108 def test_encode_header_with_invalid_utf16_character(self): 109 with self.assertRaises(_jwt_error.JwtInvalidError): 110 _jwt_format.encode_header('{"alg": "RS256", "a":"\uD834"}') 111 112 @parameterized.parameters([ 113 'HS256', 'HS384', 'HS512', 'ES256', 'ES384', 'ES512', 'RS256', 'RS384', 114 'RS384', 'RS512', 'PS256', 'PS384', 'PS512' 115 ]) 116 def test_create_validate_header(self, algorithm): 117 encoded_header = _jwt_format.create_header(algorithm, None, None) 118 json_header = _jwt_format.decode_header(encoded_header) 119 header = _json_util.json_loads(json_header) 120 _jwt_format.validate_header(header, algorithm) 121 self.assertIsNone(_jwt_format.get_type_header(header)) 122 123 def test_create_header_with_type(self): 124 encoded_header = _jwt_format.create_header('HS256', 'typeHeader', None) 125 json_header = _jwt_format.decode_header(encoded_header) 126 self.assertEqual(json_header, '{"alg":"HS256","typ":"typeHeader"}') 127 header = _json_util.json_loads(json_header) 128 _jwt_format.validate_header(header, 'HS256') 129 self.assertEqual(_jwt_format.get_type_header(header), 'typeHeader') 130 131 def test_create_header_with_type_and_kid(self): 132 encoded_header = _jwt_format.create_header('HS256', 'typeHeader', 'GsapRA') 133 json_header = _jwt_format.decode_header(encoded_header) 134 self.assertEqual(json_header, 135 '{"kid":"GsapRA","alg":"HS256","typ":"typeHeader"}') 136 header = _json_util.json_loads(json_header) 137 _jwt_format.validate_header(header, 'HS256') 138 self.assertEqual(_jwt_format.get_type_header(header), 'typeHeader') 139 140 def test_create_header_with_unknown_alg_fails(self): 141 with self.assertRaises(_jwt_error.JwtInvalidError): 142 _jwt_format.create_header('unknown', None, None) 143 144 def test_create_verify_different_algorithms_fails(self): 145 encoded_header = _jwt_format.create_header('HS256', None, None) 146 json_header = _jwt_format.decode_header(encoded_header) 147 header = _json_util.json_loads(json_header) 148 with self.assertRaises(_jwt_error.JwtInvalidError): 149 _jwt_format.validate_header(header, 'ES256') 150 151 def test_verify_empty_header_fails(self): 152 header = _json_util.json_loads('{}') 153 with self.assertRaises(_jwt_error.JwtInvalidError): 154 _jwt_format.validate_header(header, 'ES256') 155 156 def test_validate_header_with_unknown_algorithm_fails(self): 157 header = _json_util.json_loads('{"alg":"HS123"}') 158 with self.assertRaises(_jwt_error.JwtInvalidError): 159 _jwt_format.validate_header(header, 'HS123') 160 161 def test_validate_header_with_unknown_entry_success(self): 162 header = _json_util.json_loads('{"alg":"HS256","unknown":"header"}') 163 _jwt_format.validate_header(header, 'HS256') 164 165 def test_validate_header_ignores_typ(self): 166 header = _json_util.json_loads('{"alg":"HS256","typ":"unknown"}') 167 _jwt_format.validate_header(header, 'HS256') 168 169 def test_validate_header_rejects_crit(self): 170 header = _json_util.json_loads( 171 '{"alg":"HS256","crit":["http://example.invalid/UNDEFINED"],' 172 '"http://example.invalid/UNDEFINED":true}') 173 with self.assertRaises(_jwt_error.JwtInvalidError): 174 _jwt_format.validate_header(header, 'HS256') 175 176 def test_validate_header_with_kid_success(self): 177 json_header = '{"kid":"GsapRA","alg":"HS256"}' 178 header = _json_util.json_loads(json_header) 179 _jwt_format.validate_header(header, 'HS256') 180 _jwt_format.validate_header(header, 'HS256', tink_kid='GsapRA') 181 _jwt_format.validate_header(header, 'HS256', custom_kid='GsapRA') 182 with self.assertRaises(_jwt_error.JwtInvalidError): 183 # fails because tink_kid and custom_kid must not be set at the same time. 184 _jwt_format.validate_header( 185 header, 'HS256', tink_kid='GsapRA', custom_kid='GsapRA') 186 187 def test_validate_header_with_other_kid_fails(self): 188 json_header = '{"kid":"GsapRA","alg":"HS256"}' 189 header = _json_util.json_loads(json_header) 190 _jwt_format.validate_header(header, 'HS256') 191 with self.assertRaises(_jwt_error.JwtInvalidError): 192 _jwt_format.validate_header(header, 'HS256', tink_kid='otherKid') 193 with self.assertRaises(_jwt_error.JwtInvalidError): 194 _jwt_format.validate_header(header, 'HS256', custom_kid='otherKid') 195 196 def test_validate_header_with_missing_kid_missing(self): 197 json_header = '{"alg":"HS256"}' 198 header = _json_util.json_loads(json_header) 199 with self.assertRaises(_jwt_error.JwtInvalidError): 200 # if tink_kid is set, a kid header is required. 201 _jwt_format.validate_header(header, 'HS256', tink_kid='GsapRA') 202 # if custom_kid is set, a kid header is *not* required. 203 _jwt_format.validate_header(header, 'HS256', custom_kid='GsapRA') 204 205 def test_get_kid_success(self): 206 key_id = 0x1ac6a944 207 self.assertEqual(_jwt_format.get_kid(key_id, tink_pb2.TINK), 'GsapRA') 208 self.assertIsNone(_jwt_format.get_kid(key_id, tink_pb2.RAW), None) 209 with self.assertRaises(_jwt_error.JwtInvalidError): 210 _jwt_format.get_kid(key_id, tink_pb2.LEGACY) 211 212 def test_get_kid_invalid_input_fails(self): 213 with self.assertRaises(_jwt_error.JwtInvalidError): 214 _jwt_format.get_kid(123, tink_pb2.LEGACY) 215 with self.assertRaises(_jwt_error.JwtInvalidError): 216 _jwt_format.get_kid(-1, tink_pb2.TINK) 217 with self.assertRaises(_jwt_error.JwtInvalidError): 218 _jwt_format.get_kid(2**33, tink_pb2.TINK) 219 220 def test_json_decode_encode_payload_fixed_data(self): 221 # Example from https://tools.ietf.org/html/rfc7519#section-3.1 222 encoded_payload = (b'eyJpc3MiOiJqb2UiLA0KICJleHAiOjEzMDA4MTkzODAsDQogImh0' 223 b'dHA6Ly9leGFtcGxlLmNvbS9pc19yb290Ijp0cnVlfQ') 224 json_payload = _jwt_format.decode_payload(encoded_payload) 225 payload = _json_util.json_loads(json_payload) 226 self.assertEqual(payload['iss'], 'joe') 227 self.assertEqual(payload['exp'], 1300819380) 228 self.assertEqual(payload['http://example.com/is_root'], True) 229 self.assertEqual( 230 _jwt_format.decode_payload(_jwt_format.encode_payload(json_payload)), 231 json_payload) 232 233 def test_decode_encode_payload(self): 234 # Example from https://tools.ietf.org/html/rfc7519#section-3.1 235 encoded_payload = (b'eyJpc3MiOiJqb2UiLA0KICJleHAiOjEzMDA4MTkzODAsDQogImh0' 236 b'dHA6Ly9leGFtcGxlLmNvbS9pc19yb290Ijp0cnVlfQ') 237 json_payload = _jwt_format.decode_payload(encoded_payload) 238 payload = _json_util.json_loads(json_payload) 239 self.assertEqual(payload['iss'], 'joe') 240 self.assertEqual(payload['exp'], 1300819380) 241 self.assertEqual(payload['http://example.com/is_root'], True) 242 self.assertEqual( 243 _jwt_format.decode_payload(_jwt_format.encode_payload(json_payload)), 244 json_payload) 245 246 def test_encode_payload_with_utf16_surrogate(self): 247 self.assertEqual( 248 _jwt_format.encode_payload('{"iss":"\U0001d11e"}'), 249 b'eyJpc3MiOiLwnYSeIn0') 250 251 def test_encode_payload_with_invalid_utf16(self): 252 with self.assertRaises(_jwt_error.JwtInvalidError): 253 _jwt_format.encode_payload('{"iss":"\uD834"}') 254 255 def test_create_unsigned_compact_success(self): 256 raw_jwt = _raw_jwt.raw_jwt_from_json(None, '{"iss":"joe"}') 257 self.assertEqual( 258 _jwt_format.create_unsigned_compact('RS256', None, raw_jwt), 259 b'eyJhbGciOiJSUzI1NiJ9.eyJpc3MiOiJqb2UifQ') 260 261 def test_encode_decode_signature_success(self): 262 signature = bytes([ 263 116, 24, 223, 180, 151, 153, 224, 37, 79, 250, 96, 125, 216, 173, 187, 264 186, 22, 212, 37, 77, 105, 214, 191, 240, 91, 88, 5, 88, 83, 132, 141, 265 121 266 ]) 267 encoded = _jwt_format.encode_signature(signature) 268 self.assertEqual(encoded, b'dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk') 269 self.assertEqual(_jwt_format.decode_signature(encoded), signature) 270 271 def test_signed_compact_create_split(self): 272 raw_jwt = _raw_jwt.raw_jwt_from_json('JWT', '{"iss":"joe"}') 273 signature = _jwt_format.decode_signature( 274 b'dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk') 275 unsigned_compact = _jwt_format.create_unsigned_compact( 276 'RS256', None, raw_jwt) 277 signed_compact = _jwt_format.create_signed_compact(unsigned_compact, 278 signature) 279 un_comp, hdr, pay, sig = _jwt_format.split_signed_compact(signed_compact) 280 281 self.assertEqual( 282 unsigned_compact, 283 b'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJqb2UifQ') 284 self.assertEqual( 285 signed_compact, 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.' 286 'eyJpc3MiOiJqb2UifQ.dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk') 287 self.assertEqual(un_comp, unsigned_compact) 288 self.assertEqual(sig, signature) 289 self.assertEqual(hdr, '{"alg":"RS256","typ":"JWT"}') 290 header = _json_util.json_loads(hdr) 291 _jwt_format.validate_header(header, 'RS256') 292 self.assertEqual(pay, '{"iss":"joe"}') 293 self.assertEqual(_jwt_format.get_type_header(header), 'JWT') 294 295 def test_signed_compact_create_split_with_kid(self): 296 raw_jwt = _raw_jwt.raw_jwt_from_json(None, '{"iss":"joe"}') 297 signature = _jwt_format.decode_signature( 298 b'dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk') 299 unsigned_compact = _jwt_format.create_unsigned_compact( 300 'RS256', 'AZxkm2U', raw_jwt) 301 signed_compact = _jwt_format.create_signed_compact(unsigned_compact, 302 signature) 303 un_comp, hdr, pay, sig = _jwt_format.split_signed_compact(signed_compact) 304 305 self.assertEqual( 306 unsigned_compact, 307 b'eyJraWQiOiJBWnhrbTJVIiwiYWxnIjoiUlMyNTYifQ.eyJpc3MiOiJqb2UifQ') 308 self.assertEqual( 309 signed_compact, 310 'eyJraWQiOiJBWnhrbTJVIiwiYWxnIjoiUlMyNTYifQ.eyJpc3MiOiJqb2UifQ' 311 '.dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk') 312 self.assertEqual(un_comp, unsigned_compact) 313 self.assertEqual(sig, signature) 314 self.assertEqual(hdr, '{"kid":"AZxkm2U","alg":"RS256"}') 315 header = _json_util.json_loads(hdr) 316 _jwt_format.validate_header(header, 'RS256') 317 self.assertEqual(pay, '{"iss":"joe"}') 318 self.assertIsNone(_jwt_format.get_type_header(header)) 319 320 def test_split_empty_signed_compact(self): 321 un_comp, hdr, pay, sig = _jwt_format.split_signed_compact('..') 322 self.assertEqual(un_comp, b'.') 323 self.assertEmpty(sig) 324 self.assertEmpty(hdr) 325 self.assertEmpty(pay) 326 327 def test_split_signed_compact_success(self): 328 un_comp, hdr, pay, sig = _jwt_format.split_signed_compact('e30.e30.YWJj') 329 self.assertEqual(un_comp, b'e30.e30') 330 self.assertEqual(sig, b'abc') 331 self.assertEqual(hdr, '{}') 332 self.assertEqual(pay, '{}') 333 334 def test_split_signed_compact_with_bad_format_fails(self): 335 with self.assertRaises(_jwt_error.JwtInvalidError): 336 _jwt_format.split_signed_compact('e30.e30.YWJj.abc') 337 with self.assertRaises(_jwt_error.JwtInvalidError): 338 _jwt_format.split_signed_compact('e30.e30.YWJj.') 339 with self.assertRaises(_jwt_error.JwtInvalidError): 340 _jwt_format.split_signed_compact('.e30.e30.YWJj') 341 with self.assertRaises(_jwt_error.JwtInvalidError): 342 _jwt_format.split_signed_compact('.e30.e30.') 343 with self.assertRaises(_jwt_error.JwtInvalidError): 344 _jwt_format.split_signed_compact('e30') 345 with self.assertRaises(_jwt_error.JwtInvalidError): 346 _jwt_format.split_signed_compact('') 347 348 def test_split_signed_compact_with_bad_characters_fails(self): 349 with self.assertRaises(_jwt_error.JwtInvalidError): 350 _jwt_format.split_signed_compact('{e30.e30.YWJj') 351 with self.assertRaises(_jwt_error.JwtInvalidError): 352 _jwt_format.split_signed_compact(' e30.e30.YWJj') 353 with self.assertRaises(_jwt_error.JwtInvalidError): 354 _jwt_format.split_signed_compact('e30. e30.YWJj') 355 with self.assertRaises(_jwt_error.JwtInvalidError): 356 _jwt_format.split_signed_compact('e30.e30.YWJj ') 357 with self.assertRaises(_jwt_error.JwtInvalidError): 358 _jwt_format.split_signed_compact('e30.e30.\nYWJj') 359 with self.assertRaises(_jwt_error.JwtInvalidError): 360 _jwt_format.split_signed_compact('e30.\re30.YWJj') 361 with self.assertRaises(_jwt_error.JwtInvalidError): 362 _jwt_format.split_signed_compact('e30$.e30.YWJj') 363 with self.assertRaises(_jwt_error.JwtInvalidError): 364 _jwt_format.split_signed_compact('e30.$e30.YWJj') 365 with self.assertRaises(_jwt_error.JwtInvalidError): 366 _jwt_format.split_signed_compact('e30.e30.YWJj$') 367 with self.assertRaises(_jwt_error.JwtInvalidError): 368 _jwt_format.split_signed_compact('e30.e30.YWJj\ud83c') 369 370 def test_split_signed_compact_with_invalid_utf8_in_header(self): 371 encoded_header = _jwt_format.base64_encode(b'{"alg":"RS256", "bad":"\xc2"}') 372 token = (encoded_header + b'.e30.YWJj').decode('utf8') 373 with self.assertRaises(_jwt_error.JwtInvalidError): 374 _jwt_format.split_signed_compact(token) 375 376 377if __name__ == '__main__': 378 absltest.main() 379