xref: /aosp_15_r20/external/tink/python/tink/jwt/_jwt_format_test.py (revision e7b1675dde1b92d52ec075b0a92829627f2c52a5)
1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests for tink.python.tink.jwt._jwt_format."""
15
16from absl.testing import absltest
17from absl.testing import parameterized
18from tink.proto import tink_pb2
19from tink.jwt import _json_util
20from tink.jwt import _jwt_error
21from tink.jwt import _jwt_format
22from tink.jwt import _raw_jwt
23
24
25class JwtFormatTest(parameterized.TestCase):
26
27  def test_base64_encode_decode_header_fixed_data(self):
28    # Example from https://tools.ietf.org/html/rfc7519#section-3.1
29    header = bytes([
30        123, 34, 116, 121, 112, 34, 58, 34, 74, 87, 84, 34, 44, 13, 10, 32, 34,
31        97, 108, 103, 34, 58, 34, 72, 83, 50, 53, 54, 34, 125
32    ])
33    encoded_header = b'eyJ0eXAiOiJKV1QiLA0KICJhbGciOiJIUzI1NiJ9'
34    self.assertEqual(_jwt_format.base64_encode(header), encoded_header)
35    self.assertEqual(_jwt_format.base64_decode(encoded_header), header)
36
37  def test_base64_encode_decode_payload_fixed_data(self):
38    # Example from https://tools.ietf.org/html/rfc7519#section-3.1
39    payload = bytes([
40        123, 34, 105, 115, 115, 34, 58, 34, 106, 111, 101, 34, 44, 13, 10, 32,
41        34, 101, 120, 112, 34, 58, 49, 51, 48, 48, 56, 49, 57, 51, 56, 48, 44,
42        13, 10, 32, 34, 104, 116, 116, 112, 58, 47, 47, 101, 120, 97, 109, 112,
43        108, 101, 46, 99, 111, 109, 47, 105, 115, 95, 114, 111, 111, 116, 34,
44        58, 116, 114, 117, 101, 125
45    ])
46    encoded_payload = (b'eyJpc3MiOiJqb2UiLA0KICJleHAiOjEzMDA4MTkzODAsDQogImh0'
47                       b'dHA6Ly9leGFtcGxlLmNvbS9pc19yb290Ijp0cnVlfQ')
48    self.assertEqual(_jwt_format.base64_encode(payload), encoded_payload)
49    self.assertEqual(_jwt_format.base64_decode(encoded_payload), payload)
50
51  def test_base64_decode_bad_format_raises_jwt_invalid_error(self):
52    with self.assertRaises(_jwt_error.JwtInvalidError):
53      _jwt_format.base64_decode(b'aeyJh')
54
55  def test_base64_decode_fails_with_unknown_chars(self):
56    self.assertNotEmpty(
57        _jwt_format.base64_decode(
58            b'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-')
59    )
60    self.assertEqual(_jwt_format.base64_decode(b''), b'')
61    with self.assertRaises(_jwt_error.JwtInvalidError):
62      _jwt_format.base64_decode(b'[')
63    with self.assertRaises(_jwt_error.JwtInvalidError):
64      _jwt_format.base64_decode(b'@')
65    with self.assertRaises(_jwt_error.JwtInvalidError):
66      _jwt_format.base64_decode(b'/')
67    with self.assertRaises(_jwt_error.JwtInvalidError):
68      _jwt_format.base64_decode(b':')
69    with self.assertRaises(_jwt_error.JwtInvalidError):
70      _jwt_format.base64_decode(b'{')
71
72  def test_decode_encode_header_hs256(self):
73    # Example from https://tools.ietf.org/html/rfc7515#appendix-A.1
74    encoded_header = b'eyJ0eXAiOiJKV1QiLA0KICJhbGciOiJIUzI1NiJ9'
75    json_header = _jwt_format.decode_header(encoded_header)
76    header = _json_util.json_loads(json_header)
77    self.assertEqual(header['alg'], 'HS256')
78    self.assertEqual(header['typ'], 'JWT')
79    self.assertEqual(
80        _jwt_format.decode_header(_jwt_format.encode_header(json_header)),
81        json_header)
82
83  def test_decode_encode_header_rs256(self):
84    # Example from https://tools.ietf.org/html/rfc7515#appendix-A.2
85    encoded_header = b'eyJhbGciOiJSUzI1NiJ9'
86    json_header = _jwt_format.decode_header(encoded_header)
87    header = _json_util.json_loads(json_header)
88    self.assertEqual(header['alg'], 'RS256')
89    self.assertEqual(
90        _jwt_format.decode_header(_jwt_format.encode_header(json_header)),
91        json_header)
92
93  def test_encode_decode_header(self):
94    encoded_header = _jwt_format.encode_header('{ "alg": "RS256"} ')
95    json_header = _jwt_format.decode_header(encoded_header)
96    self.assertEqual(json_header, '{ "alg": "RS256"} ')
97
98  def test_decode_header_with_invalid_utf8(self):
99    encoded_header = _jwt_format.base64_encode(b'{"alg":"RS256", "bad":"\xc2"}')
100    with self.assertRaises(_jwt_error.JwtInvalidError):
101      _jwt_format.decode_header(encoded_header)
102
103  def test_encode_header_with_utf16_surrogate(self):
104    self.assertEqual(
105        _jwt_format.encode_header('{"alg": "RS256", "a":"\U0001d11e"}'),
106        b'eyJhbGciOiAiUlMyNTYiLCAiYSI6IvCdhJ4ifQ')
107
108  def test_encode_header_with_invalid_utf16_character(self):
109    with self.assertRaises(_jwt_error.JwtInvalidError):
110      _jwt_format.encode_header('{"alg": "RS256", "a":"\uD834"}')
111
112  @parameterized.parameters([
113      'HS256', 'HS384', 'HS512', 'ES256', 'ES384', 'ES512', 'RS256', 'RS384',
114      'RS384', 'RS512', 'PS256', 'PS384', 'PS512'
115  ])
116  def test_create_validate_header(self, algorithm):
117    encoded_header = _jwt_format.create_header(algorithm, None, None)
118    json_header = _jwt_format.decode_header(encoded_header)
119    header = _json_util.json_loads(json_header)
120    _jwt_format.validate_header(header, algorithm)
121    self.assertIsNone(_jwt_format.get_type_header(header))
122
123  def test_create_header_with_type(self):
124    encoded_header = _jwt_format.create_header('HS256', 'typeHeader', None)
125    json_header = _jwt_format.decode_header(encoded_header)
126    self.assertEqual(json_header, '{"alg":"HS256","typ":"typeHeader"}')
127    header = _json_util.json_loads(json_header)
128    _jwt_format.validate_header(header, 'HS256')
129    self.assertEqual(_jwt_format.get_type_header(header), 'typeHeader')
130
131  def test_create_header_with_type_and_kid(self):
132    encoded_header = _jwt_format.create_header('HS256', 'typeHeader', 'GsapRA')
133    json_header = _jwt_format.decode_header(encoded_header)
134    self.assertEqual(json_header,
135                     '{"kid":"GsapRA","alg":"HS256","typ":"typeHeader"}')
136    header = _json_util.json_loads(json_header)
137    _jwt_format.validate_header(header, 'HS256')
138    self.assertEqual(_jwt_format.get_type_header(header), 'typeHeader')
139
140  def test_create_header_with_unknown_alg_fails(self):
141    with self.assertRaises(_jwt_error.JwtInvalidError):
142      _jwt_format.create_header('unknown', None, None)
143
144  def test_create_verify_different_algorithms_fails(self):
145    encoded_header = _jwt_format.create_header('HS256', None, None)
146    json_header = _jwt_format.decode_header(encoded_header)
147    header = _json_util.json_loads(json_header)
148    with self.assertRaises(_jwt_error.JwtInvalidError):
149      _jwt_format.validate_header(header, 'ES256')
150
151  def test_verify_empty_header_fails(self):
152    header = _json_util.json_loads('{}')
153    with self.assertRaises(_jwt_error.JwtInvalidError):
154      _jwt_format.validate_header(header, 'ES256')
155
156  def test_validate_header_with_unknown_algorithm_fails(self):
157    header = _json_util.json_loads('{"alg":"HS123"}')
158    with self.assertRaises(_jwt_error.JwtInvalidError):
159      _jwt_format.validate_header(header, 'HS123')
160
161  def test_validate_header_with_unknown_entry_success(self):
162    header = _json_util.json_loads('{"alg":"HS256","unknown":"header"}')
163    _jwt_format.validate_header(header, 'HS256')
164
165  def test_validate_header_ignores_typ(self):
166    header = _json_util.json_loads('{"alg":"HS256","typ":"unknown"}')
167    _jwt_format.validate_header(header, 'HS256')
168
169  def test_validate_header_rejects_crit(self):
170    header = _json_util.json_loads(
171        '{"alg":"HS256","crit":["http://example.invalid/UNDEFINED"],'
172        '"http://example.invalid/UNDEFINED":true}')
173    with self.assertRaises(_jwt_error.JwtInvalidError):
174      _jwt_format.validate_header(header, 'HS256')
175
176  def test_validate_header_with_kid_success(self):
177    json_header = '{"kid":"GsapRA","alg":"HS256"}'
178    header = _json_util.json_loads(json_header)
179    _jwt_format.validate_header(header, 'HS256')
180    _jwt_format.validate_header(header, 'HS256', tink_kid='GsapRA')
181    _jwt_format.validate_header(header, 'HS256', custom_kid='GsapRA')
182    with self.assertRaises(_jwt_error.JwtInvalidError):
183      # fails because tink_kid and custom_kid must not be set at the same time.
184      _jwt_format.validate_header(
185          header, 'HS256', tink_kid='GsapRA', custom_kid='GsapRA')
186
187  def test_validate_header_with_other_kid_fails(self):
188    json_header = '{"kid":"GsapRA","alg":"HS256"}'
189    header = _json_util.json_loads(json_header)
190    _jwt_format.validate_header(header, 'HS256')
191    with self.assertRaises(_jwt_error.JwtInvalidError):
192      _jwt_format.validate_header(header, 'HS256', tink_kid='otherKid')
193    with self.assertRaises(_jwt_error.JwtInvalidError):
194      _jwt_format.validate_header(header, 'HS256', custom_kid='otherKid')
195
196  def test_validate_header_with_missing_kid_missing(self):
197    json_header = '{"alg":"HS256"}'
198    header = _json_util.json_loads(json_header)
199    with self.assertRaises(_jwt_error.JwtInvalidError):
200      # if tink_kid is set, a kid header is required.
201      _jwt_format.validate_header(header, 'HS256', tink_kid='GsapRA')
202    # if custom_kid is set, a kid header is *not* required.
203    _jwt_format.validate_header(header, 'HS256', custom_kid='GsapRA')
204
205  def test_get_kid_success(self):
206    key_id = 0x1ac6a944
207    self.assertEqual(_jwt_format.get_kid(key_id, tink_pb2.TINK), 'GsapRA')
208    self.assertIsNone(_jwt_format.get_kid(key_id, tink_pb2.RAW), None)
209    with self.assertRaises(_jwt_error.JwtInvalidError):
210      _jwt_format.get_kid(key_id, tink_pb2.LEGACY)
211
212  def test_get_kid_invalid_input_fails(self):
213    with self.assertRaises(_jwt_error.JwtInvalidError):
214      _jwt_format.get_kid(123, tink_pb2.LEGACY)
215    with self.assertRaises(_jwt_error.JwtInvalidError):
216      _jwt_format.get_kid(-1, tink_pb2.TINK)
217    with self.assertRaises(_jwt_error.JwtInvalidError):
218      _jwt_format.get_kid(2**33, tink_pb2.TINK)
219
220  def test_json_decode_encode_payload_fixed_data(self):
221    # Example from https://tools.ietf.org/html/rfc7519#section-3.1
222    encoded_payload = (b'eyJpc3MiOiJqb2UiLA0KICJleHAiOjEzMDA4MTkzODAsDQogImh0'
223                       b'dHA6Ly9leGFtcGxlLmNvbS9pc19yb290Ijp0cnVlfQ')
224    json_payload = _jwt_format.decode_payload(encoded_payload)
225    payload = _json_util.json_loads(json_payload)
226    self.assertEqual(payload['iss'], 'joe')
227    self.assertEqual(payload['exp'], 1300819380)
228    self.assertEqual(payload['http://example.com/is_root'], True)
229    self.assertEqual(
230        _jwt_format.decode_payload(_jwt_format.encode_payload(json_payload)),
231        json_payload)
232
233  def test_decode_encode_payload(self):
234    # Example from https://tools.ietf.org/html/rfc7519#section-3.1
235    encoded_payload = (b'eyJpc3MiOiJqb2UiLA0KICJleHAiOjEzMDA4MTkzODAsDQogImh0'
236                       b'dHA6Ly9leGFtcGxlLmNvbS9pc19yb290Ijp0cnVlfQ')
237    json_payload = _jwt_format.decode_payload(encoded_payload)
238    payload = _json_util.json_loads(json_payload)
239    self.assertEqual(payload['iss'], 'joe')
240    self.assertEqual(payload['exp'], 1300819380)
241    self.assertEqual(payload['http://example.com/is_root'], True)
242    self.assertEqual(
243        _jwt_format.decode_payload(_jwt_format.encode_payload(json_payload)),
244        json_payload)
245
246  def test_encode_payload_with_utf16_surrogate(self):
247    self.assertEqual(
248        _jwt_format.encode_payload('{"iss":"\U0001d11e"}'),
249        b'eyJpc3MiOiLwnYSeIn0')
250
251  def test_encode_payload_with_invalid_utf16(self):
252    with self.assertRaises(_jwt_error.JwtInvalidError):
253      _jwt_format.encode_payload('{"iss":"\uD834"}')
254
255  def test_create_unsigned_compact_success(self):
256    raw_jwt = _raw_jwt.raw_jwt_from_json(None, '{"iss":"joe"}')
257    self.assertEqual(
258        _jwt_format.create_unsigned_compact('RS256', None, raw_jwt),
259        b'eyJhbGciOiJSUzI1NiJ9.eyJpc3MiOiJqb2UifQ')
260
261  def test_encode_decode_signature_success(self):
262    signature = bytes([
263        116, 24, 223, 180, 151, 153, 224, 37, 79, 250, 96, 125, 216, 173, 187,
264        186, 22, 212, 37, 77, 105, 214, 191, 240, 91, 88, 5, 88, 83, 132, 141,
265        121
266    ])
267    encoded = _jwt_format.encode_signature(signature)
268    self.assertEqual(encoded, b'dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk')
269    self.assertEqual(_jwt_format.decode_signature(encoded), signature)
270
271  def test_signed_compact_create_split(self):
272    raw_jwt = _raw_jwt.raw_jwt_from_json('JWT', '{"iss":"joe"}')
273    signature = _jwt_format.decode_signature(
274        b'dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk')
275    unsigned_compact = _jwt_format.create_unsigned_compact(
276        'RS256', None, raw_jwt)
277    signed_compact = _jwt_format.create_signed_compact(unsigned_compact,
278                                                       signature)
279    un_comp, hdr, pay, sig = _jwt_format.split_signed_compact(signed_compact)
280
281    self.assertEqual(
282        unsigned_compact,
283        b'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJqb2UifQ')
284    self.assertEqual(
285        signed_compact, 'eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.'
286        'eyJpc3MiOiJqb2UifQ.dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk')
287    self.assertEqual(un_comp, unsigned_compact)
288    self.assertEqual(sig, signature)
289    self.assertEqual(hdr, '{"alg":"RS256","typ":"JWT"}')
290    header = _json_util.json_loads(hdr)
291    _jwt_format.validate_header(header, 'RS256')
292    self.assertEqual(pay, '{"iss":"joe"}')
293    self.assertEqual(_jwt_format.get_type_header(header), 'JWT')
294
295  def test_signed_compact_create_split_with_kid(self):
296    raw_jwt = _raw_jwt.raw_jwt_from_json(None, '{"iss":"joe"}')
297    signature = _jwt_format.decode_signature(
298        b'dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk')
299    unsigned_compact = _jwt_format.create_unsigned_compact(
300        'RS256', 'AZxkm2U', raw_jwt)
301    signed_compact = _jwt_format.create_signed_compact(unsigned_compact,
302                                                       signature)
303    un_comp, hdr, pay, sig = _jwt_format.split_signed_compact(signed_compact)
304
305    self.assertEqual(
306        unsigned_compact,
307        b'eyJraWQiOiJBWnhrbTJVIiwiYWxnIjoiUlMyNTYifQ.eyJpc3MiOiJqb2UifQ')
308    self.assertEqual(
309        signed_compact,
310        'eyJraWQiOiJBWnhrbTJVIiwiYWxnIjoiUlMyNTYifQ.eyJpc3MiOiJqb2UifQ'
311        '.dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk')
312    self.assertEqual(un_comp, unsigned_compact)
313    self.assertEqual(sig, signature)
314    self.assertEqual(hdr, '{"kid":"AZxkm2U","alg":"RS256"}')
315    header = _json_util.json_loads(hdr)
316    _jwt_format.validate_header(header, 'RS256')
317    self.assertEqual(pay, '{"iss":"joe"}')
318    self.assertIsNone(_jwt_format.get_type_header(header))
319
320  def test_split_empty_signed_compact(self):
321    un_comp, hdr, pay, sig = _jwt_format.split_signed_compact('..')
322    self.assertEqual(un_comp, b'.')
323    self.assertEmpty(sig)
324    self.assertEmpty(hdr)
325    self.assertEmpty(pay)
326
327  def test_split_signed_compact_success(self):
328    un_comp, hdr, pay, sig = _jwt_format.split_signed_compact('e30.e30.YWJj')
329    self.assertEqual(un_comp, b'e30.e30')
330    self.assertEqual(sig, b'abc')
331    self.assertEqual(hdr, '{}')
332    self.assertEqual(pay, '{}')
333
334  def test_split_signed_compact_with_bad_format_fails(self):
335    with self.assertRaises(_jwt_error.JwtInvalidError):
336      _jwt_format.split_signed_compact('e30.e30.YWJj.abc')
337    with self.assertRaises(_jwt_error.JwtInvalidError):
338      _jwt_format.split_signed_compact('e30.e30.YWJj.')
339    with self.assertRaises(_jwt_error.JwtInvalidError):
340      _jwt_format.split_signed_compact('.e30.e30.YWJj')
341    with self.assertRaises(_jwt_error.JwtInvalidError):
342      _jwt_format.split_signed_compact('.e30.e30.')
343    with self.assertRaises(_jwt_error.JwtInvalidError):
344      _jwt_format.split_signed_compact('e30')
345    with self.assertRaises(_jwt_error.JwtInvalidError):
346      _jwt_format.split_signed_compact('')
347
348  def test_split_signed_compact_with_bad_characters_fails(self):
349    with self.assertRaises(_jwt_error.JwtInvalidError):
350      _jwt_format.split_signed_compact('{e30.e30.YWJj')
351    with self.assertRaises(_jwt_error.JwtInvalidError):
352      _jwt_format.split_signed_compact(' e30.e30.YWJj')
353    with self.assertRaises(_jwt_error.JwtInvalidError):
354      _jwt_format.split_signed_compact('e30. e30.YWJj')
355    with self.assertRaises(_jwt_error.JwtInvalidError):
356      _jwt_format.split_signed_compact('e30.e30.YWJj ')
357    with self.assertRaises(_jwt_error.JwtInvalidError):
358      _jwt_format.split_signed_compact('e30.e30.\nYWJj')
359    with self.assertRaises(_jwt_error.JwtInvalidError):
360      _jwt_format.split_signed_compact('e30.\re30.YWJj')
361    with self.assertRaises(_jwt_error.JwtInvalidError):
362      _jwt_format.split_signed_compact('e30$.e30.YWJj')
363    with self.assertRaises(_jwt_error.JwtInvalidError):
364      _jwt_format.split_signed_compact('e30.$e30.YWJj')
365    with self.assertRaises(_jwt_error.JwtInvalidError):
366      _jwt_format.split_signed_compact('e30.e30.YWJj$')
367    with self.assertRaises(_jwt_error.JwtInvalidError):
368      _jwt_format.split_signed_compact('e30.e30.YWJj\ud83c')
369
370  def test_split_signed_compact_with_invalid_utf8_in_header(self):
371    encoded_header = _jwt_format.base64_encode(b'{"alg":"RS256", "bad":"\xc2"}')
372    token = (encoded_header + b'.e30.YWJj').decode('utf8')
373    with self.assertRaises(_jwt_error.JwtInvalidError):
374      _jwt_format.split_signed_compact(token)
375
376
377if __name__ == '__main__':
378  absltest.main()
379