1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Cross-language tests for the StreamingAead primitive.""" 15 16import io 17 18from absl.testing import absltest 19from absl.testing import parameterized 20 21import tink 22from tink import streaming_aead 23 24from tink.testing import keyset_builder 25from util import testing_servers 26from util import utilities 27 28SUPPORTED_LANGUAGES = (testing_servers 29 .SUPPORTED_LANGUAGES_BY_PRIMITIVE['streaming_aead']) 30 31# About 49KB. 32LONG_PLAINTEXT = b' '.join(b'%d' % i for i in range(10000)) 33 34 35def key_rotation_test_cases(): 36 for enc_lang in SUPPORTED_LANGUAGES: 37 for dec_lang in SUPPORTED_LANGUAGES: 38 yield (enc_lang, dec_lang) 39 40 41def setUpModule(): 42 streaming_aead.register() 43 testing_servers.start('streaming_aead') 44 45 46def tearDownModule(): 47 testing_servers.stop() 48 49 50class StreamingAeadPythonTest(parameterized.TestCase): 51 52 @parameterized.parameters( 53 utilities.tinkey_template_names_for(streaming_aead.StreamingAead)) 54 def test_encrypt_decrypt(self, key_template_name): 55 supported_langs = utilities.SUPPORTED_LANGUAGES_BY_TEMPLATE_NAME[ 56 key_template_name] 57 self.assertNotEmpty(supported_langs) 58 key_template = utilities.KEY_TEMPLATE[key_template_name] 59 # Take the first supported language to generate the keyset. 60 keyset = testing_servers.new_keyset(supported_langs[0], key_template) 61 supported_streaming_aeads = {} 62 for lang in supported_langs: 63 supported_streaming_aeads[lang] = testing_servers.remote_primitive( 64 lang, keyset, streaming_aead.StreamingAead) 65 for lang, p in supported_streaming_aeads.items(): 66 desc = ( 67 b'This is some plaintext message to be encrypted using key_template ' 68 b'%s using %s for encryption.' 69 % (key_template_name.encode('utf8'), lang.encode('utf8'))) 70 plaintext = desc + LONG_PLAINTEXT 71 associated_data = ( 72 b'Some associated data for %s using %s for encryption.' % 73 (key_template_name.encode('utf8'), lang.encode('utf8'))) 74 plaintext_stream = io.BytesIO(plaintext) 75 ciphertext_result_stream = p.new_encrypting_stream( 76 plaintext_stream, associated_data) 77 ciphertext = ciphertext_result_stream.read() 78 for _, p2 in supported_streaming_aeads.items(): 79 ciphertext_stream = io.BytesIO(ciphertext) 80 decrypted_stream = p2.new_decrypting_stream( 81 ciphertext_stream, associated_data) 82 self.assertEqual(decrypted_stream.read(), plaintext) 83 84 @parameterized.parameters(key_rotation_test_cases()) 85 def test_key_rotation(self, enc_lang, dec_lang): 86 # Do a key rotation from an old key to a new key. 87 # Encryption and decryption are done in languages enc_lang and dec_lang. 88 builder = keyset_builder.new_keyset_builder() 89 older_key_id = builder.add_new_key( 90 streaming_aead.streaming_aead_key_templates.AES128_GCM_HKDF_4KB) 91 builder.set_primary_key(older_key_id) 92 enc1 = testing_servers.remote_primitive(enc_lang, builder.keyset(), 93 streaming_aead.StreamingAead) 94 dec1 = testing_servers.remote_primitive(dec_lang, builder.keyset(), 95 streaming_aead.StreamingAead) 96 newer_key_id = builder.add_new_key( 97 streaming_aead.streaming_aead_key_templates.AES256_GCM_HKDF_4KB) 98 enc2 = testing_servers.remote_primitive(enc_lang, builder.keyset(), 99 streaming_aead.StreamingAead) 100 dec2 = testing_servers.remote_primitive(dec_lang, builder.keyset(), 101 streaming_aead.StreamingAead) 102 103 builder.set_primary_key(newer_key_id) 104 enc3 = testing_servers.remote_primitive(enc_lang, builder.keyset(), 105 streaming_aead.StreamingAead) 106 dec3 = testing_servers.remote_primitive(dec_lang, builder.keyset(), 107 streaming_aead.StreamingAead) 108 109 builder.disable_key(older_key_id) 110 enc4 = testing_servers.remote_primitive(enc_lang, builder.keyset(), 111 streaming_aead.StreamingAead) 112 dec4 = testing_servers.remote_primitive(dec_lang, builder.keyset(), 113 streaming_aead.StreamingAead) 114 115 self.assertNotEqual(older_key_id, newer_key_id) 116 # 1 encrypts with the older key. So 1, 2 and 3 can decrypt it, but not 4. 117 plaintext = LONG_PLAINTEXT 118 ad = b'associated_data' 119 ciphertext1 = enc1.new_encrypting_stream(io.BytesIO(plaintext), ad).read() 120 self.assertEqual( 121 dec1.new_decrypting_stream(io.BytesIO(ciphertext1), ad).read(), 122 plaintext) 123 self.assertEqual( 124 dec2.new_decrypting_stream(io.BytesIO(ciphertext1), ad).read(), 125 plaintext) 126 self.assertEqual( 127 dec3.new_decrypting_stream(io.BytesIO(ciphertext1), ad).read(), 128 plaintext) 129 with self.assertRaises(tink.TinkError): 130 _ = dec4.new_decrypting_stream(io.BytesIO(ciphertext1), ad).read() 131 132 # 2 encrypts with the older key. So 1, 2 and 3 can decrypt it, but not 4. 133 ciphertext2 = enc2.new_encrypting_stream(io.BytesIO(plaintext), ad).read() 134 self.assertEqual( 135 dec1.new_decrypting_stream(io.BytesIO(ciphertext2), ad).read(), 136 plaintext) 137 self.assertEqual( 138 dec2.new_decrypting_stream(io.BytesIO(ciphertext2), ad).read(), 139 plaintext) 140 self.assertEqual( 141 dec3.new_decrypting_stream(io.BytesIO(ciphertext2), ad).read(), 142 plaintext) 143 with self.assertRaises(tink.TinkError): 144 _ = dec4.new_decrypting_stream(io.BytesIO(ciphertext2), ad).read() 145 146 # 3 encrypts with the newer key. So 2, 3 and 4 can decrypt it, but not 1. 147 ciphertext3 = enc3.new_encrypting_stream(io.BytesIO(plaintext), ad).read() 148 with self.assertRaises(tink.TinkError): 149 _ = dec1.new_decrypting_stream(io.BytesIO(ciphertext3), ad).read() 150 self.assertEqual( 151 dec2.new_decrypting_stream(io.BytesIO(ciphertext3), ad).read(), 152 plaintext) 153 self.assertEqual( 154 dec3.new_decrypting_stream(io.BytesIO(ciphertext3), ad).read(), 155 plaintext) 156 self.assertEqual( 157 dec4.new_decrypting_stream(io.BytesIO(ciphertext3), ad).read(), 158 plaintext) 159 160 # 4 encrypts with the newer key. So 2, 3 and 4 can decrypt it, but not 1. 161 ciphertext4 = enc4.new_encrypting_stream(io.BytesIO(plaintext), ad).read() 162 with self.assertRaises(tink.TinkError): 163 _ = dec1.new_decrypting_stream(io.BytesIO(ciphertext4), ad).read() 164 self.assertEqual( 165 dec2.new_decrypting_stream(io.BytesIO(ciphertext4), ad).read(), 166 plaintext) 167 self.assertEqual( 168 dec3.new_decrypting_stream(io.BytesIO(ciphertext4), ad).read(), 169 plaintext) 170 self.assertEqual( 171 dec4.new_decrypting_stream(io.BytesIO(ciphertext4), ad).read(), 172 plaintext) 173 174if __name__ == '__main__': 175 absltest.main() 176