# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Cross-language consistency tests for AEAD. These tests generate different kind of AEAD keysets, some are valid, some are invalid. The test succeeds if all implementation treat the keyset consistently, so either encryption/decryption works as expected, or the keyset is rejected. """ import itertools from typing import List from typing import Tuple from absl.testing import absltest from absl.testing import parameterized import tink from tink.proto import aes_ctr_hmac_aead_pb2 from tink.proto import aes_eax_pb2 from tink.proto import aes_gcm_pb2 from tink.proto import common_pb2 from tink.proto import tink_pb2 import tink_config from util import testing_servers from util import utilities HASH_TYPES = [ common_pb2.UNKNOWN_HASH, common_pb2.SHA1, common_pb2.SHA224, common_pb2.SHA256, common_pb2.SHA384, common_pb2.SHA512 ] def setUpModule(): tink.aead.register() testing_servers.start('aead_consistency') def tearDownModule(): testing_servers.stop() def _gen_keyset( type_url: str, value: bytes, key_material_type: tink_pb2.KeyData.KeyMaterialType) -> tink_pb2.Keyset: """Generates a new Keyset.""" keyset = tink_pb2.Keyset() key = keyset.key.add() key.key_data.type_url = type_url key.key_data.value = value key.key_data.key_material_type = key_material_type key.status = tink_pb2.ENABLED key.key_id = 42 key.output_prefix_type = tink_pb2.TINK keyset.primary_key_id = 42 return keyset def _gen_key_value(size: int) -> bytes: """Returns a fixed key_value of a given size.""" return bytes(i for i in range(size)) def aes_eax_key_test_cases(): def _test_case(key_size=16, iv_size=16, key_version=0): key = aes_eax_pb2.AesEaxKey() key.version = key_version key.key_value = _gen_key_value(key_size) key.params.iv_size = iv_size keyset = _gen_keyset( 'type.googleapis.com/google.crypto.tink.AesEaxKey', key.SerializeToString(), tink_pb2.KeyData.SYMMETRIC) return ('AesEaxKey(%d,%d,%d)' % (key_size, iv_size, key_version), keyset) for key_size in [15, 16, 24, 32, 64, 96]: for iv_size in [11, 12, 16, 17, 24, 32]: yield _test_case(key_size=key_size, iv_size=iv_size) yield _test_case(key_version=1) def aes_gcm_key_test_cases(): def _test_case(key_size=16, key_version=0): key = aes_gcm_pb2.AesGcmKey() key.version = key_version key.key_value = _gen_key_value(key_size) keyset = _gen_keyset( 'type.googleapis.com/google.crypto.tink.AesGcmKey', key.SerializeToString(), tink_pb2.KeyData.SYMMETRIC) return ('AesGcmKey(%d,%d)' % (key_size, key_version), keyset) for key_size in [15, 16, 24, 32, 64, 96]: yield _test_case(key_size=key_size) yield _test_case(key_version=1) def aes_ctr_hmac_aead_key_test_cases(): def _test_case(aes_key_size=16, iv_size=16, hmac_key_size=16, hmac_tag_size=16, hash_type=common_pb2.SHA256, key_version=0, aes_ctr_version=0, hmac_version=0): key = aes_ctr_hmac_aead_pb2.AesCtrHmacAeadKey() key.version = key_version key.aes_ctr_key.version = aes_ctr_version key.aes_ctr_key.params.iv_size = iv_size key.aes_ctr_key.key_value = _gen_key_value(aes_key_size) key.hmac_key.version = hmac_version key.hmac_key.params.tag_size = hmac_tag_size key.hmac_key.params.hash = hash_type key.hmac_key.key_value = _gen_key_value(hmac_key_size) keyset = _gen_keyset( 'type.googleapis.com/google.crypto.tink.AesCtrHmacAeadKey', key.SerializeToString(), tink_pb2.KeyData.SYMMETRIC) return ('AesCtrHmacAeadKey(%d,%d,%d,%d,%s,%d,%d,%d)' % (aes_key_size, iv_size, hmac_key_size, hmac_tag_size, common_pb2.HashType.Name(hash_type), key_version, aes_ctr_version, hmac_version), keyset) yield _test_case() for aes_key_size in [15, 16, 24, 32, 64, 96]: for iv_size in [11, 12, 16, 17, 24, 32]: yield _test_case(aes_key_size=aes_key_size, iv_size=iv_size) for hmac_key_size in [15, 16, 24, 32, 64, 96]: for hmac_tag_size in [9, 10, 16, 20, 21, 24, 32, 33, 64, 65]: yield _test_case(hmac_key_size=hmac_key_size, hmac_tag_size=hmac_tag_size) for hash_type in HASH_TYPES: yield _test_case(hash_type=hash_type) yield _test_case(key_version=1) yield _test_case(aes_ctr_version=1) yield _test_case(hmac_version=1) class AeadKeyConsistencyTest(parameterized.TestCase): """Tests that all implementation treat all generated keyset in the same way. We only consider keyset with single keys. This should be fine, since most inconsistencies between languages will occur in the key validation, and that is done for each key independently. """ def _create_aeads_ignore_errors( self, keyset: tink_pb2.Keyset) -> List[Tuple[tink.aead.Aead, str]]: """Creates AEADs for the given keyset in each language. Args: keyset: A keyset as 'keyset' proto. Returns: A list of pairs (aead, language) """ result = [] for lang in utilities.ALL_LANGUAGES: try: aead = testing_servers.remote_primitive(lang, keyset.SerializeToString(), tink.aead.Aead) result.append((aead, lang)) except tink.TinkError: pass return result @parameterized.parameters( itertools.chain(aes_eax_key_test_cases(), aes_gcm_key_test_cases(), aes_ctr_hmac_aead_key_test_cases())) def test_aead_creation_supported_languages_consistent(self, name, keyset): """Tests that AEAD creation is consistent in all supporeted languages.""" supported_langs = tink_config.supported_languages_for_key_type( tink_config.key_type_from_type_url(keyset.key[0].key_data.type_url)) langs = [lang for _, lang in self._create_aeads_ignore_errors(keyset)] if langs: with self.subTest('When creating AEAD objects for %s' % name): self.assertEqual(langs, supported_langs) @parameterized.parameters( itertools.chain(aes_eax_key_test_cases(), aes_gcm_key_test_cases(), aes_ctr_hmac_aead_key_test_cases())) def test_aead_creation_non_supported_languages_fail(self, name, keyset): """Tests that AEAD creation fails in all unsupported languages.""" supported_langs = tink_config.supported_languages_for_key_type( tink_config.key_type_from_type_url(keyset.key[0].key_data.type_url)) langs = [lang for _, lang in self._create_aeads_ignore_errors(keyset)] for lang in utilities.ALL_LANGUAGES: if lang not in supported_langs: self.assertNotIn( lang, langs, 'AEAD-Creation should fail in language %s for %s' % (lang, name)) @parameterized.parameters( itertools.chain(aes_eax_key_test_cases(), aes_gcm_key_test_cases(), aes_ctr_hmac_aead_key_test_cases())) def test_aead_pairwise_consistency(self, name, keyset): """Tests that created AEADS behave consistently.""" aead_and_lang = self._create_aeads_ignore_errors(keyset) plaintext = b'plaintext' associated_data = b'associated_data' for aead, lang in aead_and_lang: aead0 = aead_and_lang[0][0] lang0 = aead_and_lang[0][1] with self.subTest('Comparing %s-aead to %s-aead for %s ' % (lang0, lang, name)): ciphertext = aead.encrypt(plaintext, associated_data) self.assertEqual(aead0.decrypt(ciphertext, associated_data), plaintext) ciphertext = aead0.encrypt(plaintext, associated_data) self.assertEqual(aead.decrypt(ciphertext, associated_data), plaintext) if __name__ == '__main__': absltest.main()